-----Original Message----- > Date: Thu, 2 Nov 2017 08:43:30 +0000 > From: Jia He <hejia...@gmail.com> > To: jerin.ja...@caviumnetworks.com, dev@dpdk.org, olivier.m...@6wind.com > Cc: konstantin.anan...@intel.com, bruce.richard...@intel.com, > jianbo....@arm.com, hemant.agra...@nxp.com, Jia He <hejia...@gmail.com>, > jie2....@hxt-semitech.com, bing.z...@hxt-semitech.com, > jia...@hxt-semitech.com > Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing > X-Mailer: git-send-email 2.7.4 > > We watched a rte panic of mbuf_autotest in our qualcomm arm64 server. > As for the possible race condition, please refer to [1].
Hi Jia, In addition to Konstantin comments. Please find below some review comments. > > Furthermore, there are 2 options as suggested by Jerin: > 1. use rte_smp_rmb > 2. use load_acquire/store_release(refer to [2]). > CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by I think, The better name would be CONFIG_RTE_RING_USE_C11_MEM_MODEL or something like that. > default it is n; > > --- > Changelog: > V2: let users choose whether using load_acquire/store_release > V1: rte_smp_rmb() between 2 loads > > Signed-off-by: Jia He <hejia...@gmail.com> > Signed-off-by: jie2....@hxt-semitech.com > Signed-off-by: bing.z...@hxt-semitech.com > Signed-off-by: jia...@hxt-semitech.com > Suggested-by: jerin.ja...@caviumnetworks.com > --- > lib/librte_ring/Makefile | 4 +++- > lib/librte_ring/rte_ring.h | 38 ++++++++++++++++++++++++------ > lib/librte_ring/rte_ring_arm64.h | 48 > ++++++++++++++++++++++++++++++++++++++ > lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++ > 4 files changed, 127 insertions(+), 8 deletions(-) > create mode 100644 lib/librte_ring/rte_ring_arm64.h > create mode 100644 lib/librte_ring/rte_ring_generic.h > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile > index e34d9d9..fa57a86 100644 > --- a/lib/librte_ring/Makefile > +++ b/lib/librte_ring/Makefile > @@ -45,6 +45,8 @@ LIBABIVER := 1 > SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c > > # install includes > -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h > +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ > + rte_ring_arm64.h \ It is really not specific to arm64. We could rename it to rte_ring_c11_mem.h or something like that to reflect the implementation based on c11 memory model. > + rte_ring_generic.h > > include $(RTE_SDK)/mk/rte.lib.mk > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h > index 5e9b3b7..943b1f9 100644 > --- a/lib/librte_ring/rte_ring.h > +++ b/lib/librte_ring/rte_ring.h > @@ -103,6 +103,18 @@ extern "C" { > #include <rte_memzone.h> > #include <rte_pause.h> > > +/* In those strong memory models (e.g. x86), there is no need to add rmb() > + * between load and load. > + * In those weak models(powerpc/arm), there are 2 choices for the users > + * 1.use rmb() memory barrier > + * 2.use one-direcion load_acquire/store_release barrier > + * It depends on performance test results. */ > +#ifdef RTE_ARCH_ARM64 s/RTE_ARCH_ARM64/RTE_RING_USE_C11_MEM_MODEL and update the generic arm64 config. By that way it can used by another architecture like ppc if they choose to do so. > +#include "rte_ring_arm64.h" > +#else > +#include "rte_ring_generic.h" > +#endif > + > #define RTE_TAILQ_RING_NAME "RTE_RING" > > enum rte_ring_queue_behavior { > @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t > old_val, uint32_t new_val, > while (unlikely(ht->tail != old_val)) > rte_pause(); > > - ht->tail = new_val; > + arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE); > } > > /** > @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp, > /* Reset n to the initial burst count */ > n = max; > > - *old_head = r->prod.head; > + *old_head = arch_rte_atomic_load(&r->prod.head, > + __ATOMIC_ACQUIRE); Same as Konstantin comments, i.e move to this function to c11 memory model header file > const uint32_t cons_tail = r->cons.tail; > /* > * The subtraction is done between two unsigned 32bits value > @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp, > if (is_sp) > r->prod.head = *new_head, success = 1; > else > - success = rte_atomic32_cmpset(&r->prod.head, > - *old_head, *new_head); > + success = arch_rte_atomic32_cmpset(&r->prod.head, > + old_head, *new_head, > + 0, __ATOMIC_ACQUIRE, > + __ATOMIC_RELAXED); > } while (unlikely(success == 0)); > return n; > } > @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const > *obj_table, > goto end; > > ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); > + > +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER This #define will be removed if the function moves. > rte_smp_wmb(); > +#endif > > update_tail(&r->prod, prod_head, prod_next, is_sp); > end: > @@ -516,7 +534,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, > /* Restore n as it may change every loop */ > n = max; > > - *old_head = r->cons.head; > + *old_head = arch_rte_atomic_load(&r->cons.head, > + __ATOMIC_ACQUIRE); > const uint32_t prod_tail = r->prod.tail; > /* The subtraction is done between two unsigned 32bits value > * (the result is always modulo 32 bits even if we have > @@ -535,8 +554,10 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, > if (is_sc) > r->cons.head = *new_head, success = 1; > else > - success = rte_atomic32_cmpset(&r->cons.head, *old_head, > - *new_head); > + success = arch_rte_atomic32_cmpset(&r->cons.head, > + old_head, *new_head, > + 0, __ATOMIC_ACQUIRE, > + __ATOMIC_RELAXED); > } while (unlikely(success == 0)); > return n; > } > @@ -575,7 +596,10 @@ __rte_ring_do_dequeue(struct rte_ring *r, void > **obj_table, > goto end; > > DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); > + > +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER This #define will be removed if the function moves. > rte_smp_rmb(); > +#endif