(resending without the confidential footer, think I figured it out, ignore the previous email from me in this thread)
-- Ola On Fri, 2019-01-18 at 09:23 -0600, Gage Eads wrote: > This commit adds support for non-blocking circular ring enqueue and dequeue > functions. The ring uses a 128-bit compare-and-swap instruction, and thus > is currently limited to x86_64. > > The algorithm is based on the original rte ring (derived from FreeBSD's > bufring.h) and inspired by Michael and Scott's non-blocking concurrent > queue. Importantly, it adds a modification counter to each ring entry to > ensure only one thread can write to an unused entry. > > ----- > Algorithm: > > Multi-producer non-blocking enqueue: > 1. Move the producer head index 'n' locations forward, effectively > reserving 'n' locations. > 2. For each pointer: > a. Read the producer tail index, then ring[tail]. If ring[tail]'s > modification counter isn't 'tail', retry. > b. Construct the new entry: {pointer, tail + ring size} > c. Compare-and-swap the old entry with the new. If unsuccessful, the > next loop iteration will try to enqueue this pointer again. > d. Compare-and-swap the tail index with 'tail + 1', whether or not step 2c > succeeded. This guarantees threads can make forward progress. > > Multi-consumer non-blocking dequeue: > 1. Move the consumer head index 'n' locations forward, effectively > reserving 'n' pointers to be dequeued. > 2. Copy 'n' pointers into the caller's object table (ignoring the > modification counter), starting from ring[tail], then compare-and-swap > the tail index with 'tail + n'. If unsuccessful, repeat step 2. > > ----- > Discussion: > > There are two cases where the ABA problem is mitigated: > 1. Enqueueing a pointer to the ring: without a modification counter > tied to the tail index, the index could become stale by the time the > enqueue happens, causing it to overwrite valid data. Tying the > counter to the tail index gives us an expected value (as opposed to, > say, a monotonically incrementing counter). > > Since the counter will eventually wrap, there is potential for the ABA > problem. However, using a 64-bit counter makes this likelihood > effectively zero. > > 2. Updating a tail index: the ABA problem can occur if the thread is > preempted and the tail index wraps around. However, using 64-bit indexes > makes this likelihood effectively zero. > > With no contention, an enqueue of n pointers uses (1 + 2n) CAS operations > and a dequeue of n pointers uses 2. This algorithm has worse average-case > performance than the regular rte ring (particularly a highly-contended ring > with large bulk accesses), however: > - For applications with preemptible pthreads, the regular rte ring's > worst-case performance (i.e. one thread being preempted in the > update_tail() critical section) is much worse than the non-blocking > ring's. > - Software caching can mitigate the average case performance for ring-based > algorithms. For example, a non-blocking ring based mempool (a likely use > case for this ring) with per-thread caching. > > The non-blocking ring is enabled via a new flag, RING_F_NB. Because the > ring's memsize is now a function of its flags (the non-blocking ring > requires 128b for each entry), this commit adds a new argument ('flags') to > rte_ring_get_memsize(). An API deprecation notice will be sent in a > separate commit. > > For ease-of-use, existing ring enqueue and dequeue functions work on both > regular and non-blocking rings. This introduces an additional branch in > the datapath, but this should be a highly predictable branch. > ring_perf_autotest shows a negligible performance impact; it's hard to > distinguish a real difference versus system noise. > > | ring_perf_autotest cycles with branch - > Test | ring_perf_autotest cycles without > ------------------------------------------------------------------ > SP/SC single enq/dequeue | 0.33 > MP/MC single enq/dequeue | -4.00 > SP/SC burst enq/dequeue (size 8) | 0.00 > MP/MC burst enq/dequeue (size 8) | 0.00 > SP/SC burst enq/dequeue (size 32) | 0.00 > MP/MC burst enq/dequeue (size 32) | 0.00 > SC empty dequeue | 1.00 > MC empty dequeue | 0.00 > > Single lcore: > SP/SC bulk enq/dequeue (size 8) | 0.49 > MP/MC bulk enq/dequeue (size 8) | 0.08 > SP/SC bulk enq/dequeue (size 32) | 0.07 > MP/MC bulk enq/dequeue (size 32) | 0.09 > > Two physical cores: > SP/SC bulk enq/dequeue (size 8) | 0.19 > MP/MC bulk enq/dequeue (size 8) | -0.37 > SP/SC bulk enq/dequeue (size 32) | 0.09 > MP/MC bulk enq/dequeue (size 32) | -0.05 > > Two NUMA nodes: > SP/SC bulk enq/dequeue (size 8) | -1.96 > MP/MC bulk enq/dequeue (size 8) | 0.88 > SP/SC bulk enq/dequeue (size 32) | 0.10 > MP/MC bulk enq/dequeue (size 32) | 0.46 > > Test setup: x86_64 build with default config, dual-socket Xeon E5-2699 v4, > running on isolcpus cores with a tickless scheduler. Each test run three > times and the results averaged. > > Signed-off-by: Gage Eads <gage.e...@intel.com> > --- > lib/librte_ring/rte_ring.c | 72 ++++- > lib/librte_ring/rte_ring.h | 550 +++++++++++++++++++++++++++++++++- > - > lib/librte_ring/rte_ring_version.map | 7 + > 3 files changed, 587 insertions(+), 42 deletions(-) > > diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c > index d215acecc..f3378dccd 100644 > --- a/lib/librte_ring/rte_ring.c > +++ b/lib/librte_ring/rte_ring.c > @@ -45,9 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq) > > /* return the size of memory occupied by a ring */ > ssize_t > -rte_ring_get_memsize(unsigned count) > +rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags) > { > - ssize_t sz; > + ssize_t sz, elt_sz; > > /* count must be a power of 2 */ > if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) { > @@ -57,10 +57,23 @@ rte_ring_get_memsize(unsigned count) > return -EINVAL; > } > > - sz = sizeof(struct rte_ring) + count * sizeof(void *); > + elt_sz = (flags & RING_F_NB) ? 2 * sizeof(void *) : sizeof(void *); > + > + sz = sizeof(struct rte_ring) + count * elt_sz; > sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); > return sz; > } > +BIND_DEFAULT_SYMBOL(rte_ring_get_memsize, _v1905, 19.05); > +MAP_STATIC_SYMBOL(ssize_t rte_ring_get_memsize(unsigned int count, > + unsigned int flags), > + rte_ring_get_memsize_v1905); > + > +ssize_t > +rte_ring_get_memsize_v20(unsigned int count) > +{ > + return rte_ring_get_memsize_v1905(count, 0); > +} > +VERSION_SYMBOL(rte_ring_get_memsize, _v20, 2.0); > > int > rte_ring_init(struct rte_ring *r, const char *name, unsigned count, > @@ -82,8 +95,6 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned > count, > if (ret < 0 || ret >= (int)sizeof(r->name)) > return -ENAMETOOLONG; > r->flags = flags; > - r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP; > - r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC; > > if (flags & RING_F_EXACT_SZ) { > r->size = rte_align32pow2(count + 1); > @@ -100,8 +111,30 @@ rte_ring_init(struct rte_ring *r, const char *name, > unsigned count, > r->mask = count - 1; > r->capacity = r->mask; > } > - r->prod.head = r->cons.head = 0; > - r->prod.tail = r->cons.tail = 0; > + > + if (flags & RING_F_NB) { > + uint64_t i; > + > + r->prod_64.single = (flags & RING_F_SP_ENQ) ? __IS_SP : > __IS_MP; > + r->cons_64.single = (flags & RING_F_SC_DEQ) ? __IS_SC : > __IS_MC; > + r->prod_64.head = r->cons_64.head = 0; > + r->prod_64.tail = r->cons_64.tail = 0; > + > + for (i = 0; i < r->size; i++) { > + struct nb_ring_entry *ring_ptr, *base; > + > + base = ((struct nb_ring_entry *)&r[1]); > + > + ring_ptr = &base[i & r->mask]; > + > + ring_ptr->cnt = i; > + } > + } else { > + r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP; > + r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC; > + r->prod.head = r->cons.head = 0; > + r->prod.tail = r->cons.tail = 0; > + } > > return 0; > } > @@ -123,11 +156,19 @@ rte_ring_create(const char *name, unsigned count, int > socket_id, > > ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list); > > +#if !defined(RTE_ARCH_X86_64) > + if (flags & RING_F_NB) { > + printf("RING_F_NB is only supported on x86-64 platforms\n"); > + rte_errno = EINVAL; > + return NULL; > + } > +#endif > + > /* for an exact size ring, round up from count to a power of two */ > if (flags & RING_F_EXACT_SZ) > count = rte_align32pow2(count + 1); > > - ring_size = rte_ring_get_memsize(count); > + ring_size = rte_ring_get_memsize(count, flags); > if (ring_size < 0) { > rte_errno = ring_size; > return NULL; > @@ -227,10 +268,17 @@ rte_ring_dump(FILE *f, const struct rte_ring *r) > fprintf(f, " flags=%x\n", r->flags); > fprintf(f, " size=%"PRIu32"\n", r->size); > fprintf(f, " capacity=%"PRIu32"\n", r->capacity); > - fprintf(f, " ct=%"PRIu32"\n", r->cons.tail); > - fprintf(f, " ch=%"PRIu32"\n", r->cons.head); > - fprintf(f, " pt=%"PRIu32"\n", r->prod.tail); > - fprintf(f, " ph=%"PRIu32"\n", r->prod.head); > + if (r->flags & RING_F_NB) { > + fprintf(f, " ct=%"PRIu64"\n", r->cons_64.tail); > + fprintf(f, " ch=%"PRIu64"\n", r->cons_64.head); > + fprintf(f, " pt=%"PRIu64"\n", r->prod_64.tail); > + fprintf(f, " ph=%"PRIu64"\n", r->prod_64.head); > + } else { > + fprintf(f, " ct=%"PRIu32"\n", r->cons.tail); > + fprintf(f, " ch=%"PRIu32"\n", r->cons.head); > + fprintf(f, " pt=%"PRIu32"\n", r->prod.tail); > + fprintf(f, " ph=%"PRIu32"\n", r->prod.head); > + } > fprintf(f, " used=%u\n", rte_ring_count(r)); > fprintf(f, " avail=%u\n", rte_ring_free_count(r)); > } > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h > index b270a4746..08c9de6a6 100644 > --- a/lib/librte_ring/rte_ring.h > +++ b/lib/librte_ring/rte_ring.h > @@ -134,6 +134,18 @@ struct rte_ring { > */ > #define RING_F_EXACT_SZ 0x0004 > #define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */ > +/** > + * The ring uses non-blocking enqueue and dequeue functions. These functions > + * do not have the "non-preemptive" constraint of a regular rte ring, and > thus > + * are suited for applications using preemptible pthreads. However, the > + * non-blocking functions have worse average-case performance than their > + * regular rte ring counterparts. When used as the handler for a mempool, > + * per-thread caching can mitigate the performance difference by reducing the > + * number (and contention) of ring accesses. > + * > + * This flag is only supported on x86_64 platforms. > + */ > +#define RING_F_NB 0x0008 > > /* @internal defines for passing to the enqueue dequeue worker functions */ > #define __IS_SP 1 > @@ -151,11 +163,15 @@ struct rte_ring { > * > * @param count > * The number of elements in the ring (must be a power of 2). > + * @param flags > + * The flags the ring will be created with. > * @return > * - The memory size needed for the ring on success. > * - -EINVAL if count is not a power of 2. > */ > -ssize_t rte_ring_get_memsize(unsigned count); > +ssize_t rte_ring_get_memsize(unsigned int count, unsigned int flags); > +ssize_t rte_ring_get_memsize_v20(unsigned int count); > +ssize_t rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags); > > /** > * Initialize a ring structure. > @@ -188,6 +204,10 @@ ssize_t rte_ring_get_memsize(unsigned count); > * - RING_F_SC_DEQ: If this flag is set, the default behavior when > * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` > * is "single-consumer". Otherwise, it is "multi-consumers". > + * - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2 > + * number, but up to half the ring space may be wasted. > + * - RING_F_NB: (x86_64 only) If this flag is set, the ring uses > + * non-blocking variants of the dequeue and enqueue functions. > * @return > * 0 on success, or a negative value on error. > */ > @@ -223,12 +243,17 @@ int rte_ring_init(struct rte_ring *r, const char *name, > unsigned count, > * - RING_F_SC_DEQ: If this flag is set, the default behavior when > * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` > * is "single-consumer". Otherwise, it is "multi-consumers". > + * - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2 > + * number, but up to half the ring space may be wasted. > + * - RING_F_NB: (x86_64 only) If this flag is set, the ring uses > + * non-blocking variants of the dequeue and enqueue functions. > * @return > * On success, the pointer to the new allocated ring. NULL on error with > * rte_errno set appropriately. Possible errno values include: > * - E_RTE_NO_CONFIG - function could not get pointer to rte_config > structure > * - E_RTE_SECONDARY - function was called from a secondary process > instance > - * - EINVAL - count provided is not a power of 2 > + * - EINVAL - count provided is not a power of 2, or RING_F_NB is used on > an > + * unsupported platform > * - ENOSPC - the maximum number of memzones has already been allocated > * - EEXIST - a memzone with the same name already exists > * - ENOMEM - no appropriate memory area found in which to create memzone > @@ -284,6 +309,50 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r); > } \ > } while (0) > > +/* The actual enqueue of pointers on the ring. > + * Used only by the single-producer non-blocking enqueue function, but > + * out-lined here for code readability. > + */ > +#define ENQUEUE_PTRS_NB(r, ring_start, prod_head, obj_table, n) do { \ > + unsigned int i; \ > + const uint32_t size = (r)->size; \ > + size_t idx = prod_head & (r)->mask; \ > + size_t new_cnt = prod_head + size; \ > + struct nb_ring_entry *ring = (struct nb_ring_entry *)ring_start; \ > + if (likely(idx + n < size)) { \ > + for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) { > \ > + ring[idx].ptr = obj_table[i]; \ > + ring[idx].cnt = new_cnt + i; \ > + ring[idx + 1].ptr = obj_table[i + 1]; \ > + ring[idx + 1].cnt = new_cnt + i + 1; \ > + ring[idx + 2].ptr = obj_table[i + 2]; \ > + ring[idx + 2].cnt = new_cnt + i + 2; \ > + ring[idx + 3].ptr = obj_table[i + 3]; \ > + ring[idx + 3].cnt = new_cnt + i + 3; \ > + } \ > + switch (n & 0x3) { \ > + case 3: \ > + ring[idx].cnt = new_cnt + i; \ > + ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \ > + case 2: \ > + ring[idx].cnt = new_cnt + i; \ > + ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \ > + case 1: \ > + ring[idx].cnt = new_cnt + i; \ > + ring[idx++].ptr = obj_table[i++]; \ > + } \ > + } else { \ > + for (i = 0; idx < size; i++, idx++) { \ > + ring[idx].cnt = new_cnt + i; \ > + ring[idx].ptr = obj_table[i]; \ > + } \ > + for (idx = 0; i < n; i++, idx++) { \ > + ring[idx].cnt = new_cnt + i; \ > + ring[idx].ptr = obj_table[i]; \ > + } \ > + } \ > +} while (0) > + > /* the actual copy of pointers on the ring to obj_table. > * Placed here since identical code needed in both > * single and multi consumer dequeue functions */ > @@ -315,6 +384,39 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r); > } \ > } while (0) > > +/* The actual copy of pointers on the ring to obj_table. > + * Placed here since identical code needed in both > + * single and multi consumer non-blocking dequeue functions. > + */ > +#define DEQUEUE_PTRS_NB(r, ring_start, cons_head, obj_table, n) do { \ > + unsigned int i; \ > + size_t idx = cons_head & (r)->mask; \ > + const uint32_t size = (r)->size; \ > + struct nb_ring_entry *ring = (struct nb_ring_entry *)ring_start; \ > + if (likely(idx + n < size)) { \ > + for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\ > + obj_table[i] = ring[idx].ptr; \ > + obj_table[i + 1] = ring[idx + 1].ptr; \ > + obj_table[i + 2] = ring[idx + 2].ptr; \ > + obj_table[i + 3] = ring[idx + 3].ptr; \ > + } \ > + switch (n & 0x3) { \ > + case 3: \ > + obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \ > + case 2: \ > + obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \ > + case 1: \ > + obj_table[i++] = ring[idx++].ptr; \ > + } \ > + } else { \ > + for (i = 0; idx < size; i++, idx++) \ > + obj_table[i] = ring[idx].ptr; \ > + for (idx = 0; i < n; i++, idx++) \ > + obj_table[i] = ring[idx].ptr; \ > + } \ > +} while (0) > + > + > /* Between load and load. there might be cpu reorder in weak model > * (powerpc/arm). > * There are 2 choices for the users > @@ -331,6 +433,319 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r); > #endif > #include "rte_ring_generic_64.h" > > +/* @internal 128-bit structure used by the non-blocking ring */ > +struct nb_ring_entry { > + void *ptr; /**< Data pointer */ > + uint64_t cnt; /**< Modification counter */ Why not make 'cnt' uintptr_t? This way 32-bit architectures will also be supported. I think there are some claims that DPDK still supports e.g. ARMv7a and possibly also 32-bit x86? > +}; > + > +/* The non-blocking ring algorithm is based on the original rte ring (derived > + * from FreeBSD's bufring.h) and inspired by Michael and Scott's non-blocking > + * concurrent queue. > + */ > + > +/** > + * @internal > + * Enqueue several objects on the non-blocking ring (single-producer only) > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param n > + * The number of objects to add in the ring from the obj_table. > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring > + * @param free_space > + * returns the amount of space after the enqueue operation has finished > + * @return > + * Actual number of objects enqueued. > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_nb_enqueue_sp(struct rte_ring *r, void * const *obj_table, > + unsigned int n, > + enum rte_ring_queue_behavior behavior, > + unsigned int *free_space) > +{ > + uint32_t free_entries; > + size_t head, next; > + > + n = __rte_ring_move_prod_head_64(r, 1, n, behavior, > + &head, &next, &free_entries); > + if (n == 0) > + goto end; > + > + ENQUEUE_PTRS_NB(r, &r[1], head, obj_table, n); > + > + r->prod_64.tail += n; Don't we need release order when (or smp_wmb between) writing of the ring pointers and the update of tail? By updating the tail pointer, we are synchronising with a consumer. I prefer using __atomic operations even for load and store. You can see which parts of the code that synchronise with each other, e.g. store-release to some location synchronises with load-acquire from the same location. If you don't know how different threads synchronise with each other, you are very likely to make mistakes. > + > +end: > + if (free_space != NULL) > + *free_space = free_entries - n; > + return n; > +} > + > +/** > + * @internal > + * Enqueue several objects on the non-blocking ring (multi-producer safe) > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param n > + * The number of objects to add in the ring from the obj_table. > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring > + * @param free_space > + * returns the amount of space after the enqueue operation has finished > + * @return > + * Actual number of objects enqueued. > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_nb_enqueue_mp(struct rte_ring *r, void * const *obj_table, > + unsigned int n, > + enum rte_ring_queue_behavior behavior, > + unsigned int *free_space) > +{ > +#if !defined(RTE_ARCH_X86_64) || !defined(ALLOW_EXPERIMENTAL_API) > + RTE_SET_USED(r); > + RTE_SET_USED(obj_table); > + RTE_SET_USED(n); > + RTE_SET_USED(behavior); > + RTE_SET_USED(free_space); > +#ifndef ALLOW_EXPERIMENTAL_API > + printf("[%s()] RING_F_NB requires an experimental API." > + " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n" > + , __func__); > +#endif > + return 0; > +#endif > +#if defined(RTE_ARCH_X86_64) && defined(ALLOW_EXPERIMENTAL_API) > + size_t head, next, tail; > + uint32_t free_entries; > + unsigned int i; > + > + n = __rte_ring_move_prod_head_64(r, 0, n, behavior, > + &head, &next, &free_entries); > + if (n == 0) > + goto end; > + > + for (i = 0; i < n; /* i incremented if enqueue succeeds */) { > + struct nb_ring_entry old_value, new_value; > + struct nb_ring_entry *ring_ptr; > + > + /* Enqueue to the tail entry. If another thread wins the > race, > + * retry with the new tail. > + */ > + tail = r->prod_64.tail; > + > + ring_ptr = &((struct nb_ring_entry *)&r[1])[tail & r->mask]; This is an ugly expression and cast. Also I think it is unnecessary. What's preventing this from being written without a cast? Perhaps the ring array needs to be a union of "void *" and struct nb_ring_entry? > + > + old_value = *ring_ptr; > + > + /* If the tail entry's modification counter doesn't match the > + * producer tail index, it's already been updated. > + */ > + if (old_value.cnt != tail) > + continue; Continue restarts the loop at the condition test in the for statement, 'i' and 'n' are unchanged. Then we re-read 'prod_64.tail' and 'ring[tail & mask]'. If some other thread never updates 'prod_64.tail', the test here (ring[tail].cnt != tail) will still be true and we will spin forever. Waiting for other threads <=> blocking behaviour so this is not a non- blocking design. > + > + /* Prepare the new entry. The cnt field mitigates the ABA > + * problem on the ring write. > + */ > + new_value.ptr = obj_table[i]; > + new_value.cnt = tail + r->size; > + > + if (rte_atomic128_cmpset((volatile rte_int128_t *)ring_ptr, > + (rte_int128_t *)&old_value, > + (rte_int128_t *)&new_value)) > + i++; > + > + /* Every thread attempts the cmpset, so they don't have to > wait > + * for the thread that successfully enqueued to the ring. > + * Using a 64-bit tail mitigates the ABA problem here. > + * > + * Built-in used to handle variable-sized tail index. > + */ But prod_64.tail is 64 bits so not really variable size? > + __sync_bool_compare_and_swap(&r->prod_64.tail, tail, tail + > 1); What memory order is required here? Why not use __atomic_compare_exchange() with explicit memory order parameters? > + } > + > +end: > + if (free_space != NULL) > + *free_space = free_entries - n; > + return n; > +#endif > +} > + > +/** > + * @internal Enqueue several objects on the non-blocking ring > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param n > + * The number of objects to add in the ring from the obj_table. > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the ring > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring > + * @param is_sp > + * Indicates whether to use single producer or multi-producer head update > + * @param free_space > + * returns the amount of space after the enqueue operation has finished > + * @return > + * Actual number of objects enqueued. > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_nb_enqueue(struct rte_ring *r, void * const *obj_table, > + unsigned int n, enum rte_ring_queue_behavior > behavior, > + unsigned int is_sp, unsigned int *free_space) > +{ > + if (is_sp) > + return __rte_ring_do_nb_enqueue_sp(r, obj_table, n, > + behavior, free_space); > + else > + return __rte_ring_do_nb_enqueue_mp(r, obj_table, n, > + behavior, free_space); > +} > + > +/** > + * @internal > + * Dequeue several objects from the non-blocking ring (single-consumer > only) > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param n > + * The number of objects to pull from the ring. > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring > + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring > + * @param available > + * returns the number of remaining ring entries after the dequeue has > finished > + * @return > + * - Actual number of objects dequeued. > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_nb_dequeue_sc(struct rte_ring *r, void **obj_table, > + unsigned int n, > + enum rte_ring_queue_behavior behavior, > + unsigned int *available) > +{ > + size_t head, next; > + uint32_t entries; > + > + n = __rte_ring_move_cons_head_64(r, 1, n, behavior, > + &head, &next, &entries); > + if (n == 0) > + goto end; > + > + DEQUEUE_PTRS_NB(r, &r[1], head, obj_table, n); > + > + r->cons_64.tail += n; Memory ordering? Consumer synchronises with producer. > + > +end: > + if (available != NULL) > + *available = entries - n; > + return n; > +} > + > +/** > + * @internal > + * Dequeue several objects from the non-blocking ring (multi-consumer safe) > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param n > + * The number of objects to pull from the ring. > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring > + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring > + * @param available > + * returns the number of remaining ring entries after the dequeue has > finished > + * @return > + * - Actual number of objects dequeued. > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_nb_dequeue_mc(struct rte_ring *r, void **obj_table, > + unsigned int n, > + enum rte_ring_queue_behavior behavior, > + unsigned int *available) > +{ > + size_t head, next; > + uint32_t entries; > + > + n = __rte_ring_move_cons_head_64(r, 0, n, behavior, > + &head, &next, &entries); > + if (n == 0) > + goto end; > + > + while (1) { > + size_t tail = r->cons_64.tail; > + > + /* Dequeue from the cons tail onwards. If multiple threads > read > + * the same pointers, the thread that successfully performs > the > + * CAS will keep them and the other(s) will retry. > + */ > + DEQUEUE_PTRS_NB(r, &r[1], tail, obj_table, n); > + > + next = tail + n; > + > + /* Built-in used to handle variable-sized tail index. */ > + if (__sync_bool_compare_and_swap(&r->cons_64.tail, tail, > next)) > + /* There is potential for the ABA problem here, but > + * that is mitigated by the large (64-bit) tail. > + */ > + break; > + } > + > +end: > + if (available != NULL) > + *available = entries - n; > + return n; > +} > + > +/** > + * @internal Dequeue several objects from the non-blocking ring > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param n > + * The number of objects to pull from the ring. > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from the ring > + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring > + * @param available > + * returns the number of remaining ring entries after the dequeue has > finished > + * @return > + * - Actual number of objects dequeued. > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_nb_dequeue(struct rte_ring *r, void **obj_table, > + unsigned int n, enum rte_ring_queue_behavior behavior, > + unsigned int is_sc, unsigned int *available) > +{ > + if (is_sc) > + return __rte_ring_do_nb_dequeue_sc(r, obj_table, n, > + behavior, available); > + else > + return __rte_ring_do_nb_dequeue_mc(r, obj_table, n, > + behavior, available); > +} > + > /** > * @internal Enqueue several objects on the ring > * > @@ -438,8 +853,14 @@ static __rte_always_inline unsigned int > rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, > unsigned int n, unsigned int *free_space) > { > - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, > - __IS_MP, free_space); > + if (r->flags & RING_F_NB) > + return __rte_ring_do_nb_enqueue(r, obj_table, n, > + RTE_RING_QUEUE_FIXED, > __IS_MP, > + free_space); > + else > + return __rte_ring_do_enqueue(r, obj_table, n, > + RTE_RING_QUEUE_FIXED, __IS_MP, > + free_space); > } > > /** > @@ -461,8 +882,14 @@ static __rte_always_inline unsigned int > rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, > unsigned int n, unsigned int *free_space) > { > - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, > - __IS_SP, free_space); > + if (r->flags & RING_F_NB) > + return __rte_ring_do_nb_enqueue(r, obj_table, n, > + RTE_RING_QUEUE_FIXED, > __IS_SP, > + free_space); > + else > + return __rte_ring_do_enqueue(r, obj_table, n, > + RTE_RING_QUEUE_FIXED, __IS_SP, > + free_space); > } > > /** > @@ -488,8 +915,14 @@ static __rte_always_inline unsigned int > rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, > unsigned int n, unsigned int *free_space) > { > - return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, > - r->prod.single, free_space); > + if (r->flags & RING_F_NB) > + return __rte_ring_do_nb_enqueue(r, obj_table, n, > + RTE_RING_QUEUE_FIXED, > + r->prod_64.single, > free_space); > + else > + return __rte_ring_do_enqueue(r, obj_table, n, > + RTE_RING_QUEUE_FIXED, > + r->prod.single, free_space); > } > > /** > @@ -572,8 +1005,14 @@ static __rte_always_inline unsigned int > rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, > unsigned int n, unsigned int *available) > { > - return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, > - __IS_MC, available); > + if (r->flags & RING_F_NB) > + return __rte_ring_do_nb_dequeue(r, obj_table, n, > + RTE_RING_QUEUE_FIXED, > __IS_MC, > + available); > + else > + return __rte_ring_do_dequeue(r, obj_table, n, > + RTE_RING_QUEUE_FIXED, __IS_MC, > + available); > } > > /** > @@ -596,8 +1035,14 @@ static __rte_always_inline unsigned int > rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, > unsigned int n, unsigned int *available) > { > - return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, > - __IS_SC, available); > + if (r->flags & RING_F_NB) > + return __rte_ring_do_nb_dequeue(r, obj_table, n, > + RTE_RING_QUEUE_FIXED, > __IS_SC, > + available); > + else > + return __rte_ring_do_dequeue(r, obj_table, n, > + RTE_RING_QUEUE_FIXED, __IS_SC, > + available); > } > > /** > @@ -623,8 +1068,14 @@ static __rte_always_inline unsigned int > rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, > unsigned int *available) > { > - return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, > - r->cons.single, available); > + if (r->flags & RING_F_NB) > + return __rte_ring_do_nb_dequeue(r, obj_table, n, > + RTE_RING_QUEUE_FIXED, > + r->cons_64.single, > available); > + else > + return __rte_ring_do_dequeue(r, obj_table, n, > + RTE_RING_QUEUE_FIXED, > + r->cons.single, available); > } > > /** > @@ -699,9 +1150,13 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p) > static inline unsigned > rte_ring_count(const struct rte_ring *r) > { > - uint32_t prod_tail = r->prod.tail; > - uint32_t cons_tail = r->cons.tail; > - uint32_t count = (prod_tail - cons_tail) & r->mask; > + uint32_t count; > + > + if (r->flags & RING_F_NB) > + count = (r->prod_64.tail - r->cons_64.tail) & r->mask; > + else > + count = (r->prod.tail - r->cons.tail) & r->mask; > + > return (count > r->capacity) ? r->capacity : count; > } > > @@ -821,8 +1276,14 @@ static __rte_always_inline unsigned > rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, > unsigned int n, unsigned int *free_space) > { > - return __rte_ring_do_enqueue(r, obj_table, n, > - RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space); > + if (r->flags & RING_F_NB) > + return __rte_ring_do_nb_enqueue(r, obj_table, n, > + RTE_RING_QUEUE_VARIABLE, > + __IS_MP, free_space); > + else > + return __rte_ring_do_enqueue(r, obj_table, n, > + RTE_RING_QUEUE_VARIABLE, > + __IS_MP, free_space); > } > > /** > @@ -844,8 +1305,14 @@ static __rte_always_inline unsigned > rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, > unsigned int n, unsigned int *free_space) > { > - return __rte_ring_do_enqueue(r, obj_table, n, > - RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space); > + if (r->flags & RING_F_NB) > + return __rte_ring_do_nb_enqueue(r, obj_table, n, > + RTE_RING_QUEUE_VARIABLE, > + __IS_SP, free_space); > + else > + return __rte_ring_do_enqueue(r, obj_table, n, > + RTE_RING_QUEUE_VARIABLE, > + __IS_SP, free_space); > } > > /** > @@ -871,8 +1338,14 @@ static __rte_always_inline unsigned > rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, > unsigned int n, unsigned int *free_space) > { > - return __rte_ring_do_enqueue(r, obj_table, n, > RTE_RING_QUEUE_VARIABLE, > - r->prod.single, free_space); > + if (r->flags & RING_F_NB) > + return __rte_ring_do_nb_enqueue(r, obj_table, n, > + RTE_RING_QUEUE_VARIABLE, > + r->prod_64.single, > free_space); > + else > + return __rte_ring_do_enqueue(r, obj_table, n, > + RTE_RING_QUEUE_VARIABLE, > + r->prod.single, free_space); > } > > /** > @@ -899,8 +1372,14 @@ static __rte_always_inline unsigned > rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, > unsigned int n, unsigned int *available) > { > - return __rte_ring_do_dequeue(r, obj_table, n, > - RTE_RING_QUEUE_VARIABLE, __IS_MC, available); > + if (r->flags & RING_F_NB) > + return __rte_ring_do_nb_dequeue(r, obj_table, n, > + RTE_RING_QUEUE_VARIABLE, > + __IS_MC, available); > + else > + return __rte_ring_do_dequeue(r, obj_table, n, > + RTE_RING_QUEUE_VARIABLE, > + __IS_MC, available); > } > > /** > @@ -924,8 +1403,14 @@ static __rte_always_inline unsigned > rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, > unsigned int n, unsigned int *available) > { > - return __rte_ring_do_dequeue(r, obj_table, n, > - RTE_RING_QUEUE_VARIABLE, __IS_SC, available); > + if (r->flags & RING_F_NB) > + return __rte_ring_do_nb_dequeue(r, obj_table, n, > + RTE_RING_QUEUE_VARIABLE, > + __IS_SC, available); > + else > + return __rte_ring_do_dequeue(r, obj_table, n, > + RTE_RING_QUEUE_VARIABLE, > + __IS_SC, available); > } > > /** > @@ -951,9 +1436,14 @@ static __rte_always_inline unsigned > rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, > unsigned int n, unsigned int *available) > { > - return __rte_ring_do_dequeue(r, obj_table, n, > - RTE_RING_QUEUE_VARIABLE, > - r->cons.single, available); > + if (r->flags & RING_F_NB) > + return __rte_ring_do_nb_dequeue(r, obj_table, n, > + RTE_RING_QUEUE_VARIABLE, > + r->cons_64.single, > available); > + else > + return __rte_ring_do_dequeue(r, obj_table, n, > + RTE_RING_QUEUE_VARIABLE, > + r->cons.single, available); > } > > #ifdef __cplusplus > diff --git a/lib/librte_ring/rte_ring_version.map > b/lib/librte_ring/rte_ring_version.map > index d935efd0d..8969467af 100644 > --- a/lib/librte_ring/rte_ring_version.map > +++ b/lib/librte_ring/rte_ring_version.map > @@ -17,3 +17,10 @@ DPDK_2.2 { > rte_ring_free; > > } DPDK_2.0; > + > +DPDK_19.05 { > + global: > + > + rte_ring_get_memsize; > + > +} DPDK_2.2; -- Ola Liljedahl, Networking System Architect, Arm Phone +46706866373, Skype ola.liljedahl