Hi Honnappa, > > + > > +/** > > + * @internal This function updates the producer head for enqueue > > + * > > + * @param r > > + * A pointer to the ring structure > > + * @param is_sp > > + * Indicates whether multi-producer path is needed or not > > + * @param n > > + * The number of elements we will want to enqueue, i.e. how far should > > the > > + * head be moved > > + * @param behavior > > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring > > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from > > ring > > + * @param old_head > > + * Returns head value as it was before the move, i.e. where enqueue > > starts > > + * @param new_head > > + * Returns the current/new head value i.e. where enqueue finishes > > + * @param free_entries > > + * Returns the amount of free space in the ring BEFORE head was moved > > + * @return > > + * Actual number of objects enqueued. > > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num, > > + enum rte_ring_queue_behavior behavior, uint32_t *old_head, > > + uint32_t *free_entries) > > +{ > > + uint32_t n; > > + union rte_ring_ht_pos np, op; > > + > > + const uint32_t capacity = r->capacity; > > + > > + do { > > + /* Reset n to the initial burst count */ > > + n = num; > > + > > + /* wait for tail to be equal to head, , acquire point */ > > + __rte_ring_hts_head_wait(&r->hts_prod, &op); > > + > > + /* > > + * The subtraction is done between two unsigned 32bits value > > + * (the result is always modulo 32 bits even if we have > > + * *old_head > cons_tail). So 'free_entries' is always between > > 0 > > + * and capacity (which is < size). > > + */ > > + *free_entries = capacity + r->cons.tail - op.pos.head; > > + > > + /* check that we have enough room in ring */ > > + if (unlikely(n > *free_entries)) > > + n = (behavior == RTE_RING_QUEUE_FIXED) ? > > + 0 : *free_entries; > > + > > + if (n == 0) > > + break; > > + > > + np.pos.tail = op.pos.tail; > > + np.pos.head = op.pos.head + n; > > + > > + } while (__atomic_compare_exchange_n(&r->hts_prod.ht.raw, > > + &op.raw, np.raw, > > + 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0); > __ATOMIC_RELEASE can be __ATOMIC_RELAXED. The RELEASE while updating after > the elements are written is enough.
I looked at it once again and I think RELAXED probably not enough here (same as RELEASE). Seems we have to use ACQUIRE here (and in other similar places) to forbid CPU speculatively do actual objects copy before CAS. Another alternative would probably use : cons_tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE); *free_entries = capacity + cons_tail - op.pos.head; instead of just *free_entries = capacity + r->cons.tail - op.pos.head; above. But that would mean two acquire points inside the loop: load(prod, ACQUIRE); load(cons.tail, ACQUIRE); Plus for me CAS(..., ACQUIRE, ACQUIRE) seems more logical here, so leaning that way. > > > + > > + *old_head = op.pos.head; > > + return n; > > +}