> -----Original Message----- > From: Ilya Maximets <i.maxim...@samsung.com> > Sent: Thursday, March 7, 2019 5:48 PM > To: Gavin Hu (Arm Technology China) <gavin...@arm.com>; dev@dpdk.org > Cc: nd <n...@arm.com>; tho...@monjalon.net; jer...@marvell.com; > hemant.agra...@nxp.com; nipun.gu...@nxp.com; Honnappa Nagarahalli > <honnappa.nagaraha...@arm.com>; olivier.m...@6wind.com > Subject: Re: [PATCH v2] ring: enforce reading the tails before ring > operations > > On 07.03.2019 12:27, Gavin Hu (Arm Technology China) wrote: > > > > > >> -----Original Message----- > >> From: Ilya Maximets <i.maxim...@samsung.com> > >> Sent: Thursday, March 7, 2019 4:52 PM > >> To: Gavin Hu (Arm Technology China) <gavin...@arm.com>; > >> dev@dpdk.org > >> Cc: nd <n...@arm.com>; tho...@monjalon.net; jer...@marvell.com; > >> hemant.agra...@nxp.com; nipun.gu...@nxp.com; Honnappa Nagarahalli > >> <honnappa.nagaraha...@arm.com>; olivier.m...@6wind.com > >> Subject: Re: [PATCH v2] ring: enforce reading the tails before ring > >> operations > >> > >> On 07.03.2019 9:45, gavin hu wrote: > >>> In weak memory models, like arm64, reading the {prod,cons}.tail may get > >>> reordered after reading or writing the ring slots, which corrupts the ring > >>> and stale data is observed. > >>> > >>> This issue was reported by NXP on 8-A72 DPAA2 board. The problem is > >> most > >>> likely caused by missing the acquire semantics when reading cons.tail (in > >>> SP enqueue) or prod.tail (in SC dequeue) which makes it possible to > read > >> a > >>> stale value from the ring slots. > >>> > >>> For MP (and MC) case, rte_atomic32_cmpset() already provides the > >> required > >>> ordering. This patch is to prevent reading and writing the ring slots get > >>> reordered before reading {prod,cons}.tail for SP (and SC) case. > >> > >> Read barrier rte_smp_rmb() is OK to prevent reading the ring get > >> reordered > >> before reading the tail. However, to prevent *writing* the ring get > >> reordered > >> *before reading* the tail you need a full memory barrier, i.e. > >> rte_smp_mb(). > > > > ISHLD(rte_smp_rmb is DMB(ishld) orders LD/LD and LD/ST, while WMB(ST > Option) orders ST/ST. > > For more details, please refer to: Table B2-1 Encoding of the DMB and DSB > <option> parameter in > > https://developer.arm.com/docs/ddi0487/latest/arm-architecture- > reference-manual-armv8-for-armv8-a-architecture-profile > > I see. But you have to change the rte_smp_rmb() function definition in > lib/librte_eal/common/include/generic/rte_atomic.h and assure that all > other architectures follows same rules. > Otherwise, this change is logically wrong, because read barrier in current > definition could not be used to order Load with Store. >
Good points, let me re-think how to handle for other architectures. Full MB is required for other architectures(x86? Ppc?), but for arm, read barrier(load/store and load/load) is enough. > > > >> > >>> > >>> Signed-off-by: gavin hu <gavin...@arm.com> > >>> Reviewed-by: Ola Liljedahl <ola.liljed...@arm.com> > >>> Tested-by: Nipun Gupta <nipun.gu...@nxp.com> > >>> --- > >>> lib/librte_ring/rte_ring_generic.h | 16 ++++++++++------ > >>> 1 file changed, 10 insertions(+), 6 deletions(-) > >>> > >>> diff --git a/lib/librte_ring/rte_ring_generic.h > >> b/lib/librte_ring/rte_ring_generic.h > >>> index ea7dbe5..1bd3dfd 100644 > >>> --- a/lib/librte_ring/rte_ring_generic.h > >>> +++ b/lib/librte_ring/rte_ring_generic.h > >>> @@ -90,9 +90,11 @@ __rte_ring_move_prod_head(struct rte_ring *r, > >> unsigned int is_sp, > >>> return 0; > >>> > >>> *new_head = *old_head + n; > >>> - if (is_sp) > >>> - r->prod.head = *new_head, success = 1; > >>> - else > >>> + if (is_sp) { > >>> + r->prod.head = *new_head; > >>> + rte_smp_rmb(); > >>> + success = 1; > >>> + } else > >>> success = rte_atomic32_cmpset(&r->prod.head, > >>> *old_head, *new_head); > >>> } while (unlikely(success == 0)); > >>> @@ -158,9 +160,11 @@ __rte_ring_move_cons_head(struct rte_ring > *r, > >> unsigned int is_sc, > >>> return 0; > >>> > >>> *new_head = *old_head + n; > >>> - if (is_sc) > >>> - r->cons.head = *new_head, success = 1; > >>> - else > >>> + if (is_sc) { > >>> + r->cons.head = *new_head; > >>> + rte_smp_rmb(); > >>> + success = 1; > >>> + } else > >>> success = rte_atomic32_cmpset(&r->cons.head, > >> *old_head, > >>> *new_head); > >>> } while (unlikely(success == 0)); > >>>