Hi Ola,

<snip>

> > @@ -331,6 +433,319 @@ void rte_ring_dump(FILE *f, const struct
> > rte_ring *r);
> >  #endif
> >  #include "rte_ring_generic_64.h"
> >
> > +/* @internal 128-bit structure used by the non-blocking ring */
> > +struct nb_ring_entry {
> > +   void *ptr; /**< Data pointer */
> > +   uint64_t cnt; /**< Modification counter */
> Why not make 'cnt' uintptr_t? This way 32-bit architectures will also be
> supported. I think there are some claims that DPDK still supports e.g. ARMv7a
> and possibly also 32-bit x86?

I chose a 64-bit modification counter because (practically speaking) the ABA 
problem will not occur with such a large counter -- definitely not within my 
lifetime. See the "Discussion" section of the commit message for more 
information.

With a 32-bit counter, there is a very (very) low likelihood of it, but it is 
possible. Personally, I don't feel comfortable providing such code, because a) 
I doubt all users would understand the implementation well enough to do the 
risk/reward analysis, and b) such a bug would be near impossible to reproduce 
and root-cause if it did occur.

> 
> > +};
> > +
> > +/* The non-blocking ring algorithm is based on the original rte ring
> > +(derived
> > + * from FreeBSD's bufring.h) and inspired by Michael and Scott's
> > +non-blocking
> > + * concurrent queue.
> > + */
> > +
> > +/**
> > + * @internal
> > + *   Enqueue several objects on the non-blocking ring
> > +(single-producer only)
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param obj_table
> > + *   A pointer to a table of void * pointers (objects).
> > + * @param n
> > + *   The number of objects to add in the ring from the obj_table.
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the
> > +ring
> > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to
> > +the ring
> > + * @param free_space
> > + *   returns the amount of space after the enqueue operation has
> > +finished
> > + * @return
> > + *   Actual number of objects enqueued.
> > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_nb_enqueue_sp(struct rte_ring *r, void * const *obj_table,
> > +                       unsigned int n,
> > +                       enum rte_ring_queue_behavior behavior,
> > +                       unsigned int *free_space)
> > +{
> > +   uint32_t free_entries;
> > +   size_t head, next;
> > +
> > +   n = __rte_ring_move_prod_head_64(r, 1, n, behavior,
> > +                                    &head, &next, &free_entries);
> > +   if (n == 0)
> > +           goto end;
> > +
> > +   ENQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> > +
> > +   r->prod_64.tail += n;
> Don't we need release order when (or smp_wmb between) writing of the ring
> pointers and the update of tail? By updating the tail pointer, we are
> synchronising with a consumer.
> 
> I prefer using __atomic operations even for load and store. You can see which
> parts of the code that synchronise with each other, e.g. store-release to some
> location synchronises with load-acquire from the same location. If you don't
> know how different threads synchronise with each other, you are very likely to
> make mistakes.
> 

You can tell this code was written when I thought x86-64 was the only viable 
target :). Yes, you are correct.

With regards to using __atomic intrinsics, I'm planning on taking a similar 
approach to the functions duplicated in rte_ring_generic.h and 
rte_ring_c11_mem.h: one version that uses rte_atomic functions (and thus 
stricter memory ordering) and one that uses __atomic intrinsics (and thus can 
benefit from more relaxed memory ordering).

> > +
> > +end:
> > +   if (free_space != NULL)
> > +           *free_space = free_entries - n;
> > +   return n;
> > +}
> > +
> > +/**
> > + * @internal
> > + *   Enqueue several objects on the non-blocking ring (multi-producer
> > +safe)
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param obj_table
> > + *   A pointer to a table of void * pointers (objects).
> > + * @param n
> > + *   The number of objects to add in the ring from the obj_table.
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the
> > +ring
> > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to
> > +the ring
> > + * @param free_space
> > + *   returns the amount of space after the enqueue operation has
> > +finished
> > + * @return
> > + *   Actual number of objects enqueued.
> > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_nb_enqueue_mp(struct rte_ring *r, void * const *obj_table,
> > +                       unsigned int n,
> > +                       enum rte_ring_queue_behavior behavior,
> > +                       unsigned int *free_space)
> > +{
> > +#if !defined(RTE_ARCH_X86_64) || !defined(ALLOW_EXPERIMENTAL_API)
> > +   RTE_SET_USED(r);
> > +   RTE_SET_USED(obj_table);
> > +   RTE_SET_USED(n);
> > +   RTE_SET_USED(behavior);
> > +   RTE_SET_USED(free_space);
> > +#ifndef ALLOW_EXPERIMENTAL_API
> > +   printf("[%s()] RING_F_NB requires an experimental API."
> > +          " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
> > +          , __func__);
> > +#endif
> > +   return 0;
> > +#endif
> > +#if defined(RTE_ARCH_X86_64) && defined(ALLOW_EXPERIMENTAL_API)
> > +   size_t head, next, tail;
> > +   uint32_t free_entries;
> > +   unsigned int i;
> > +
> > +   n = __rte_ring_move_prod_head_64(r, 0, n, behavior,
> > +                                    &head, &next, &free_entries);
> > +   if (n == 0)
> > +           goto end;
> > +
> > +   for (i = 0; i < n; /* i incremented if enqueue succeeds */) {
> > +           struct nb_ring_entry old_value, new_value;
> > +           struct nb_ring_entry *ring_ptr;
> > +
> > +           /* Enqueue to the tail entry. If another thread wins the
> > race,
> > +            * retry with the new tail.
> > +            */
> > +           tail = r->prod_64.tail;
> > +
> > +           ring_ptr = &((struct nb_ring_entry *)&r[1])[tail & r->mask];
> This is an ugly expression and cast. Also I think it is unnecessary. What's
> preventing this from being written without a cast? Perhaps the ring array 
> needs
> to be a union of "void *" and struct nb_ring_entry?

The cast is necessary for the correct pointer arithmetic (let "uintptr_t base 
== &r[1]"):
- With cast: ring_ptr = base + sizeof(struct nb_ring_entry) * (tail & r->mask);
- W/o cast: ring_ptr = base + sizeof(struct rte_ring) * (tail & r->mask);

FWIW, this is essentially the same as is done with the second argument (&r[1]) 
to ENQUEUE_PTRS and DEQUEUE_PTRS, but there it's split across multiple lines of 
code. The equivalent here would be:
 
struct nb_ring_entry *ring_base = (struct nb_ring_entry*)&r[1];
ring_ptr = ring_base[tail & r->mask];

Which is more legible, I think.

There is no ring array structure in which to add a union; the ring array is a 
contiguous chunk of memory that immediately follows after the end of a struct 
rte_ring. We interpret the memory there according to the ring entry data type 
(void * for regular rings and struct nb_ring_entry for non-blocking rings).

> 
> > +
> > +           old_value = *ring_ptr;
> > +
> > +           /* If the tail entry's modification counter doesn't match the
> > +            * producer tail index, it's already been updated.
> > +            */
> > +           if (old_value.cnt != tail)
> > +                   continue;
> Continue restarts the loop at the condition test in the for statement, 'i' 
> and 'n'
> are unchanged. Then we re-read 'prod_64.tail' and 'ring[tail & mask]'. If some
> other thread never updates 'prod_64.tail', the test here (ring[tail].cnt != 
> tail) will
> still be true and we will spin forever.
> 
> Waiting for other threads <=> blocking behaviour so this is not a non- 
> blocking
> design.
> 

You're absolutely right. The if-statement was added as optimization to avoid 
128-bit cmpset operations that are known to fail, but in this form it violates 
the non-blocking design.

I see two solutions: 1) drop the if-statement altogether, or 2) attempt to 
update prod_64.tail before continuing. Both require every thread to attempt to 
update prod_64.tail on every iteration, but #2 will result in fewer failed 
128-bit cmpsets.

> > +
> > +           /* Prepare the new entry. The cnt field mitigates the ABA
> > +            * problem on the ring write.
> > +            */
> > +           new_value.ptr = obj_table[i];
> > +           new_value.cnt = tail + r->size;
> > +
> > +           if (rte_atomic128_cmpset((volatile rte_int128_t *)ring_ptr,
> > +                                    (rte_int128_t *)&old_value,
> > +                                    (rte_int128_t *)&new_value))
> > +                   i++;
> > +
> > +           /* Every thread attempts the cmpset, so they don't have to
> > wait
> > +            * for the thread that successfully enqueued to the ring.
> > +            * Using a 64-bit tail mitigates the ABA problem here.
> > +            *
> > +            * Built-in used to handle variable-sized tail index.
> > +            */
> But prod_64.tail is 64 bits so not really variable size?
> 

(See next comment)

> > +           __sync_bool_compare_and_swap(&r->prod_64.tail, tail, tail +
> > 1);
> What memory order is required here? Why not use
> __atomic_compare_exchange() with explicit memory order parameters?
> 

This is an artifact from an older patchset that used uintptr_t, and before I 
learned that other platforms could support this non-blocking algorithm (hence 
the __sync type intrinsic was sufficient).

At any rate, as described earlier in this response, I plan on writing these 
functions using the __atomic builtins in the next patchset.

> > +   }
> > +
> > +end:
> > +   if (free_space != NULL)
> > +           *free_space = free_entries - n;
> > +   return n;
> > +#endif
> > +}
> > +
> > +/**
> > + * @internal Enqueue several objects on the non-blocking ring
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param obj_table
> > + *   A pointer to a table of void * pointers (objects).
> > + * @param n
> > + *   The number of objects to add in the ring from the obj_table.
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the
> > +ring
> > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to
> > +the ring
> > + * @param is_sp
> > + *   Indicates whether to use single producer or multi-producer head
> > +update
> > + * @param free_space
> > + *   returns the amount of space after the enqueue operation has
> > +finished
> > + * @return
> > + *   Actual number of objects enqueued.
> > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_nb_enqueue(struct rte_ring *r, void * const *obj_table,
> > +                    unsigned int n, enum rte_ring_queue_behavior
> > behavior,
> > +                    unsigned int is_sp, unsigned int *free_space) {
> > +   if (is_sp)
> > +           return __rte_ring_do_nb_enqueue_sp(r, obj_table, n,
> > +                                              behavior, free_space);
> > +   else
> > +           return __rte_ring_do_nb_enqueue_mp(r, obj_table, n,
> > +                                              behavior, free_space);
> > +}
> > +
> > +/**
> > + * @internal
> > + *   Dequeue several objects from the non-blocking ring
> > +(single-consumer
> > only)
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param obj_table
> > + *   A pointer to a table of void * pointers (objects).
> > + * @param n
> > + *   The number of objects to pull from the ring.
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from
> > + the ring
> > + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> > + the ring
> > + * @param available
> > + *   returns the number of remaining ring entries after the dequeue
> > + has
> > finished
> > + * @return
> > + *   - Actual number of objects dequeued.
> > + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_nb_dequeue_sc(struct rte_ring *r, void **obj_table,
> > +                       unsigned int n,
> > +                       enum rte_ring_queue_behavior behavior,
> > +                       unsigned int *available)
> > +{
> > +   size_t head, next;
> > +   uint32_t entries;
> > +
> > +   n = __rte_ring_move_cons_head_64(r, 1, n, behavior,
> > +                                    &head, &next, &entries);
> > +   if (n == 0)
> > +           goto end;
> > +
> > +   DEQUEUE_PTRS_NB(r, &r[1], head, obj_table, n);
> > +
> > +   r->cons_64.tail += n;
> Memory ordering? Consumer synchronises with producer.
> 

Agreed, that is missing here. Will fix.

Thanks,
Gage

> --
> Ola Liljedahl, Networking System Architect, Arm Phone +46706866373, Skype
> ola.liljedahl

Reply via email to