Hi Gavin > -----Original Message----- > From: Gavin Hu [mailto:gavin...@arm.com] > Sent: 2018年8月7日 11:20 > To: dev@dpdk.org > Cc: gavin...@arm.com; honnappa.nagaraha...@arm.com; > steve.cap...@arm.com; ola.liljed...@arm.com; > jerin.ja...@caviumnetworks.com; hemant.agra...@nxp.com; He, Jia > <jia...@hxt-semitech.com>; sta...@dpdk.org > Subject: [PATCH v2] ring: fix c11 memory ordering issue > > This patch includes two bug fixes(#1 and 2) and two optimisations(#3 and #4).
Maybe you need to split this into small parts. > 1) In update_tail, read ht->tail using __atomic_load.Although the > compiler currently seems to be doing the right thing even without > _atomic_load, we don't want to give the compiler freedom to optimise > what should be an atomic load, it should not be arbitarily moved > around. > 2) Synchronize the load-acquire of the tail and the store-release > within update_tail, the store release ensures all the ring operations, > engqueu or dequeue are seen by the observers as soon as they see > the updated tail. The load-acquire is required for correctly compu- > tate the free_entries or avail_entries, respectively for enqueue and > dequeue operations, the data dependency is not reliable for ordering > as the compiler might break it by saving to temporary values to boost > performance. Could you describe the race condition in details? e.g. cpu 1 cpu2 code1 code2 Cheers, Jia > 3) In __rte_ring_move_prod_head, move the __atomic_load_n up and out of > the do {} while loop as upon failure the old_head will be updated, > another load is costy and not necessary. > 4) When calling __atomic_compare_exchange_n, relaxed ordering for both > success and failure, as multiple threads can work independently on > the same end of the ring (either enqueue or dequeue) without > synchronization, not as operating on tail, which has to be finished > in sequence. > > The patch was benchmarked with test/ring_perf_autotest and it decreases the > enqueue/dequeue latency by 5% ~ 24.6% with two lcores, the real gains are > dependent on the number of lcores, depth of the ring, SPSC or MPMC. > For 1 lcore, it also improves a little, about 3 ~ 4%. > It is a big improvement, in case of MPMC, with rings size of 32, it saves > latency up > to (6.90-5.20)/6.90 = 24.6%. > > Test result data: > > SP/SC bulk enq/dequeue (size: 8): 13.19 > MP/MC bulk enq/dequeue (size: 8): 25.79 > SP/SC bulk enq/dequeue (size: 32): 3.85 > MP/MC bulk enq/dequeue (size: 32): 6.90 > > SP/SC bulk enq/dequeue (size: 8): 12.05 > MP/MC bulk enq/dequeue (size: 8): 23.06 > SP/SC bulk enq/dequeue (size: 32): 3.62 > MP/MC bulk enq/dequeue (size: 32): 5.20 > > Fixes: 39368ebfc6 ("ring: introduce C11 memory model barrier option") > Cc: sta...@dpdk.org > > Signed-off-by: Gavin Hu <gavin...@arm.com> > Reviewed-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com> > Reviewed-by: Steve Capper <steve.cap...@arm.com> > Reviewed-by: Ola Liljedahl <ola.liljed...@arm.com> > --- > lib/librte_ring/rte_ring_c11_mem.h | 38 > +++++++++++++++++++++++++------------- > 1 file changed, 25 insertions(+), 13 deletions(-) > > diff --git a/lib/librte_ring/rte_ring_c11_mem.h > b/lib/librte_ring/rte_ring_c11_mem.h > index 94df3c4a6..cfa3be4a7 100644 > --- a/lib/librte_ring/rte_ring_c11_mem.h > +++ b/lib/librte_ring/rte_ring_c11_mem.h > @@ -21,7 +21,8 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, > uint32_t new_val, > * we need to wait for them to complete > */ > if (!single) > - while (unlikely(ht->tail != old_val)) > + while (unlikely(old_val != __atomic_load_n(&ht->tail, > + __ATOMIC_RELAXED))) > rte_pause(); > > __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE); @@ -60,20 > +61,24 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, > unsigned int max = n; > int success; > > + *old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED); > do { > /* Reset n to the initial burst count */ > n = max; > > - *old_head = __atomic_load_n(&r->prod.head, > - __ATOMIC_ACQUIRE); > > - /* > - * The subtraction is done between two unsigned 32bits value > + /* load-acquire synchronize with store-release of ht->tail > + * in update_tail. > + */ > + const uint32_t cons_tail = __atomic_load_n(&r->cons.tail, > + __ATOMIC_ACQUIRE); > + > + /* The subtraction is done between two unsigned 32bits value > * (the result is always modulo 32 bits even if we have > * *old_head > cons_tail). So 'free_entries' is always between 0 > * and capacity (which is < size). > */ > - *free_entries = (capacity + r->cons.tail - *old_head); > + *free_entries = (capacity + cons_tail - *old_head); > > /* check that we have enough room in ring */ > if (unlikely(n > *free_entries)) > @@ -87,9 +92,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned > int is_sp, > if (is_sp) > r->prod.head = *new_head, success = 1; > else > + /* on failure, *old_head is updated */ > success = __atomic_compare_exchange_n(&r->prod.head, > old_head, *new_head, > - 0, __ATOMIC_ACQUIRE, > + /*weak=*/0, __ATOMIC_RELAXED, > __ATOMIC_RELAXED); > } while (unlikely(success == 0)); > return n; > @@ -128,18 +134,23 @@ __rte_ring_move_cons_head(struct rte_ring *r, int > is_sc, > int success; > > /* move cons.head atomically */ > + *old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED); > do { > /* Restore n as it may change every loop */ > n = max; > - *old_head = __atomic_load_n(&r->cons.head, > - __ATOMIC_ACQUIRE); > + > + /* this load-acquire synchronize with store-release of ht->tail > + * in update_tail. > + */ > + const uint32_t prod_tail = __atomic_load_n(&r->prod.tail, > + __ATOMIC_ACQUIRE); > > /* The subtraction is done between two unsigned 32bits value > * (the result is always modulo 32 bits even if we have > * cons_head > prod_tail). So 'entries' is always between 0 > * and size(ring)-1. > */ > - *entries = (r->prod.tail - *old_head); > + *entries = (prod_tail - *old_head); > > /* Set the actual entries for dequeue */ > if (n > *entries) > @@ -152,10 +163,11 @@ __rte_ring_move_cons_head(struct rte_ring *r, int > is_sc, > if (is_sc) > r->cons.head = *new_head, success = 1; > else > + /* on failure, *old_head will be updated */ > success = __atomic_compare_exchange_n(&r->cons.head, > - old_head, *new_head, > - 0, __ATOMIC_ACQUIRE, > - __ATOMIC_RELAXED); > + old_head, *new_head, > + /*weak=*/0, __ATOMIC_RELAXED, > + __ATOMIC_RELAXED); > } while (unlikely(success == 0)); > return n; > } > -- > 2.11.0