Hi Ola, <snip>
> > @@ -331,6 +433,319 @@ void rte_ring_dump(FILE *f, const struct > > rte_ring *r); > > #endif > > #include "rte_ring_generic_64.h" > > > > +/* @internal 128-bit structure used by the non-blocking ring */ > > +struct nb_ring_entry { > > + void *ptr; /**< Data pointer */ > > + uint64_t cnt; /**< Modification counter */ > Why not make 'cnt' uintptr_t? This way 32-bit architectures will also be > supported. I think there are some claims that DPDK still supports e.g. ARMv7a > and possibly also 32-bit x86? I chose a 64-bit modification counter because (practically speaking) the ABA problem will not occur with such a large counter -- definitely not within my lifetime. See the "Discussion" section of the commit message for more information. With a 32-bit counter, there is a very (very) low likelihood of it, but it is possible. Personally, I don't feel comfortable providing such code, because a) I doubt all users would understand the implementation well enough to do the risk/reward analysis, and b) such a bug would be near impossible to reproduce and root-cause if it did occur. > > > +}; > > + > > +/* The non-blocking ring algorithm is based on the original rte ring > > +(derived > > + * from FreeBSD's bufring.h) and inspired by Michael and Scott's > > +non-blocking > > + * concurrent queue. > > + */ > > + > > +/** > > + * @internal > > + * Enqueue several objects on the non-blocking ring > > +(single-producer only) > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects). > > + * @param n > > + * The number of objects to add in the ring from the obj_table. > > + * @param behavior > > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the > > +ring > > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to > > +the ring > > + * @param free_space > > + * returns the amount of space after the enqueue operation has > > +finished > > + * @return > > + * Actual number of objects enqueued. > > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_do_nb_enqueue_sp(struct rte_ring *r, void * const *obj_table, > > + unsigned int n, > > + enum rte_ring_queue_behavior behavior, > > + unsigned int *free_space) > > +{ > > + uint32_t free_entries; > > + size_t head, next; > > + > > + n = __rte_ring_move_prod_head_64(r, 1, n, behavior, > > + &head, &next, &free_entries); > > + if (n == 0) > > + goto end; > > + > > + ENQUEUE_PTRS_NB(r, &r[1], head, obj_table, n); > > + > > + r->prod_64.tail += n; > Don't we need release order when (or smp_wmb between) writing of the ring > pointers and the update of tail? By updating the tail pointer, we are > synchronising with a consumer. > > I prefer using __atomic operations even for load and store. You can see which > parts of the code that synchronise with each other, e.g. store-release to some > location synchronises with load-acquire from the same location. If you don't > know how different threads synchronise with each other, you are very likely to > make mistakes. > You can tell this code was written when I thought x86-64 was the only viable target :). Yes, you are correct. With regards to using __atomic intrinsics, I'm planning on taking a similar approach to the functions duplicated in rte_ring_generic.h and rte_ring_c11_mem.h: one version that uses rte_atomic functions (and thus stricter memory ordering) and one that uses __atomic intrinsics (and thus can benefit from more relaxed memory ordering). > > + > > +end: > > + if (free_space != NULL) > > + *free_space = free_entries - n; > > + return n; > > +} > > + > > +/** > > + * @internal > > + * Enqueue several objects on the non-blocking ring (multi-producer > > +safe) > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects). > > + * @param n > > + * The number of objects to add in the ring from the obj_table. > > + * @param behavior > > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the > > +ring > > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to > > +the ring > > + * @param free_space > > + * returns the amount of space after the enqueue operation has > > +finished > > + * @return > > + * Actual number of objects enqueued. > > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_do_nb_enqueue_mp(struct rte_ring *r, void * const *obj_table, > > + unsigned int n, > > + enum rte_ring_queue_behavior behavior, > > + unsigned int *free_space) > > +{ > > +#if !defined(RTE_ARCH_X86_64) || !defined(ALLOW_EXPERIMENTAL_API) > > + RTE_SET_USED(r); > > + RTE_SET_USED(obj_table); > > + RTE_SET_USED(n); > > + RTE_SET_USED(behavior); > > + RTE_SET_USED(free_space); > > +#ifndef ALLOW_EXPERIMENTAL_API > > + printf("[%s()] RING_F_NB requires an experimental API." > > + " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n" > > + , __func__); > > +#endif > > + return 0; > > +#endif > > +#if defined(RTE_ARCH_X86_64) && defined(ALLOW_EXPERIMENTAL_API) > > + size_t head, next, tail; > > + uint32_t free_entries; > > + unsigned int i; > > + > > + n = __rte_ring_move_prod_head_64(r, 0, n, behavior, > > + &head, &next, &free_entries); > > + if (n == 0) > > + goto end; > > + > > + for (i = 0; i < n; /* i incremented if enqueue succeeds */) { > > + struct nb_ring_entry old_value, new_value; > > + struct nb_ring_entry *ring_ptr; > > + > > + /* Enqueue to the tail entry. If another thread wins the > > race, > > + * retry with the new tail. > > + */ > > + tail = r->prod_64.tail; > > + > > + ring_ptr = &((struct nb_ring_entry *)&r[1])[tail & r->mask]; > This is an ugly expression and cast. Also I think it is unnecessary. What's > preventing this from being written without a cast? Perhaps the ring array > needs > to be a union of "void *" and struct nb_ring_entry? The cast is necessary for the correct pointer arithmetic (let "uintptr_t base == &r[1]"): - With cast: ring_ptr = base + sizeof(struct nb_ring_entry) * (tail & r->mask); - W/o cast: ring_ptr = base + sizeof(struct rte_ring) * (tail & r->mask); FWIW, this is essentially the same as is done with the second argument (&r[1]) to ENQUEUE_PTRS and DEQUEUE_PTRS, but there it's split across multiple lines of code. The equivalent here would be: struct nb_ring_entry *ring_base = (struct nb_ring_entry*)&r[1]; ring_ptr = ring_base[tail & r->mask]; Which is more legible, I think. There is no ring array structure in which to add a union; the ring array is a contiguous chunk of memory that immediately follows after the end of a struct rte_ring. We interpret the memory there according to the ring entry data type (void * for regular rings and struct nb_ring_entry for non-blocking rings). > > > + > > + old_value = *ring_ptr; > > + > > + /* If the tail entry's modification counter doesn't match the > > + * producer tail index, it's already been updated. > > + */ > > + if (old_value.cnt != tail) > > + continue; > Continue restarts the loop at the condition test in the for statement, 'i' > and 'n' > are unchanged. Then we re-read 'prod_64.tail' and 'ring[tail & mask]'. If some > other thread never updates 'prod_64.tail', the test here (ring[tail].cnt != > tail) will > still be true and we will spin forever. > > Waiting for other threads <=> blocking behaviour so this is not a non- > blocking > design. > You're absolutely right. The if-statement was added as optimization to avoid 128-bit cmpset operations that are known to fail, but in this form it violates the non-blocking design. I see two solutions: 1) drop the if-statement altogether, or 2) attempt to update prod_64.tail before continuing. Both require every thread to attempt to update prod_64.tail on every iteration, but #2 will result in fewer failed 128-bit cmpsets. > > + > > + /* Prepare the new entry. The cnt field mitigates the ABA > > + * problem on the ring write. > > + */ > > + new_value.ptr = obj_table[i]; > > + new_value.cnt = tail + r->size; > > + > > + if (rte_atomic128_cmpset((volatile rte_int128_t *)ring_ptr, > > + (rte_int128_t *)&old_value, > > + (rte_int128_t *)&new_value)) > > + i++; > > + > > + /* Every thread attempts the cmpset, so they don't have to > > wait > > + * for the thread that successfully enqueued to the ring. > > + * Using a 64-bit tail mitigates the ABA problem here. > > + * > > + * Built-in used to handle variable-sized tail index. > > + */ > But prod_64.tail is 64 bits so not really variable size? > (See next comment) > > + __sync_bool_compare_and_swap(&r->prod_64.tail, tail, tail + > > 1); > What memory order is required here? Why not use > __atomic_compare_exchange() with explicit memory order parameters? > This is an artifact from an older patchset that used uintptr_t, and before I learned that other platforms could support this non-blocking algorithm (hence the __sync type intrinsic was sufficient). At any rate, as described earlier in this response, I plan on writing these functions using the __atomic builtins in the next patchset. > > + } > > + > > +end: > > + if (free_space != NULL) > > + *free_space = free_entries - n; > > + return n; > > +#endif > > +} > > + > > +/** > > + * @internal Enqueue several objects on the non-blocking ring > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects). > > + * @param n > > + * The number of objects to add in the ring from the obj_table. > > + * @param behavior > > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items to the > > +ring > > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to > > +the ring > > + * @param is_sp > > + * Indicates whether to use single producer or multi-producer head > > +update > > + * @param free_space > > + * returns the amount of space after the enqueue operation has > > +finished > > + * @return > > + * Actual number of objects enqueued. > > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_do_nb_enqueue(struct rte_ring *r, void * const *obj_table, > > + unsigned int n, enum rte_ring_queue_behavior > > behavior, > > + unsigned int is_sp, unsigned int *free_space) { > > + if (is_sp) > > + return __rte_ring_do_nb_enqueue_sp(r, obj_table, n, > > + behavior, free_space); > > + else > > + return __rte_ring_do_nb_enqueue_mp(r, obj_table, n, > > + behavior, free_space); > > +} > > + > > +/** > > + * @internal > > + * Dequeue several objects from the non-blocking ring > > +(single-consumer > > only) > > + * > > + * @param r > > + * A pointer to the ring structure. > > + * @param obj_table > > + * A pointer to a table of void * pointers (objects). > > + * @param n > > + * The number of objects to pull from the ring. > > + * @param behavior > > + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from > > + the ring > > + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from > > + the ring > > + * @param available > > + * returns the number of remaining ring entries after the dequeue > > + has > > finished > > + * @return > > + * - Actual number of objects dequeued. > > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_do_nb_dequeue_sc(struct rte_ring *r, void **obj_table, > > + unsigned int n, > > + enum rte_ring_queue_behavior behavior, > > + unsigned int *available) > > +{ > > + size_t head, next; > > + uint32_t entries; > > + > > + n = __rte_ring_move_cons_head_64(r, 1, n, behavior, > > + &head, &next, &entries); > > + if (n == 0) > > + goto end; > > + > > + DEQUEUE_PTRS_NB(r, &r[1], head, obj_table, n); > > + > > + r->cons_64.tail += n; > Memory ordering? Consumer synchronises with producer. > Agreed, that is missing here. Will fix. Thanks, Gage > -- > Ola Liljedahl, Networking System Architect, Arm Phone +46706866373, Skype > ola.liljedahl