> -----Original Message----- > From: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com> > Sent: Sunday, October 13, 2019 10:32 > To: Ruifeng Wang (Arm Technology China) <ruifeng.w...@arm.com>; > david.h...@intel.com > Cc: dev@dpdk.org; hka...@marvell.com; Gavin Hu (Arm Technology China) > <gavin...@arm.com>; nd <n...@arm.com>; Ruifeng Wang (Arm Technology > China) <ruifeng.w...@arm.com>; sta...@dpdk.org; Honnappa Nagarahalli > <honnappa.nagaraha...@arm.com>; nd <n...@arm.com> > Subject: RE: [PATCH v2 1/2] lib/distributor: fix deadlock issue for aarch64 > > Hi Ruifeng, > Typically, we have followed the convention of adding comments > whenever C11 atomic APIs are used. Can you please add comments > indicating why acquire or release semantics are used? > OK. Comments will be added to explain acquire/release semantics used.
> > -----Original Message----- > > From: Ruifeng Wang <ruifeng.w...@arm.com> > > Sent: Friday, October 11, 2019 9:44 PM > > To: david.h...@intel.com > > Cc: dev@dpdk.org; hka...@marvell.com; Gavin Hu (Arm Technology China) > > <gavin...@arm.com>; Honnappa Nagarahalli > > <honnappa.nagaraha...@arm.com>; nd <n...@arm.com>; Ruifeng Wang > (Arm > > Technology China) <ruifeng.w...@arm.com>; sta...@dpdk.org > > Subject: [PATCH v2 1/2] lib/distributor: fix deadlock issue for > > aarch64 > > > > Distributor and worker threads rely on data structs in cache line for > > synchronization. The shared data structs were not protected. > > This caused deadlock issue on weaker memory ordering platforms as > aarch64. > > Fix this issue by adding memory barriers to ensure synchronization > > among cores. > > > > Bugzilla ID: 342 > > Fixes: 775003ad2f96 ("distributor: add new burst-capable library") > > Cc: sta...@dpdk.org > > > > Signed-off-by: Ruifeng Wang <ruifeng.w...@arm.com> > > Reviewed-by: Gavin Hu <gavin...@arm.com> > > --- > > lib/librte_distributor/meson.build | 5 ++ > > lib/librte_distributor/rte_distributor.c | 39 ++++++++++------ > > lib/librte_distributor/rte_distributor_v20.c | 49 > > +++++++++++++------- > > 3 files changed, 63 insertions(+), 30 deletions(-) > > > > diff --git a/lib/librte_distributor/meson.build > > b/lib/librte_distributor/meson.build > > index dba7e3b2a..26577dbc1 100644 > > --- a/lib/librte_distributor/meson.build > > +++ b/lib/librte_distributor/meson.build > > @@ -9,3 +9,8 @@ else > > endif > > headers = files('rte_distributor.h') > > deps += ['mbuf'] > > + > > +# for clang 32-bit compiles we need libatomic for 64-bit atomic ops > > +if > > +cc.get_id() == 'clang' and dpdk_conf.get('RTE_ARCH_64') == false > > + ext_deps += cc.find_library('atomic') endif > > diff --git a/lib/librte_distributor/rte_distributor.c > > b/lib/librte_distributor/rte_distributor.c > > index 21eb1fb0a..b653146d0 100644 > > --- a/lib/librte_distributor/rte_distributor.c > > +++ b/lib/librte_distributor/rte_distributor.c > > @@ -50,7 +50,8 @@ rte_distributor_request_pkt_v1705(struct > > rte_distributor *d, > > > > retptr64 = &(buf->retptr64[0]); > > /* Spin while handshake bits are set (scheduler clears it) */ > > - while (unlikely(*retptr64 & RTE_DISTRIB_GET_BUF)) { > > + while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE) > > + & RTE_DISTRIB_GET_BUF)) { > > rte_pause(); > > uint64_t t = rte_rdtsc()+100; > > > > @@ -76,7 +77,8 @@ rte_distributor_request_pkt_v1705(struct > > rte_distributor *d, > > * Finally, set the GET_BUF to signal to distributor that cache > > * line is ready for processing > > */ > > - *retptr64 |= RTE_DISTRIB_GET_BUF; > > + __atomic_store_n(retptr64, *retptr64 | RTE_DISTRIB_GET_BUF, > > + __ATOMIC_RELEASE); > > } > > BIND_DEFAULT_SYMBOL(rte_distributor_request_pkt, _v1705, 17.05); > > MAP_STATIC_SYMBOL(void rte_distributor_request_pkt(struct > > rte_distributor *d, @@ -99,7 +101,8 @@ > > rte_distributor_poll_pkt_v1705(struct > > rte_distributor *d, > > } > > > > /* If bit is set, return */ > > - if (buf->bufptr64[0] & RTE_DISTRIB_GET_BUF) > > + if (__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE) > > + & RTE_DISTRIB_GET_BUF) > > return -1; > > > > /* since bufptr64 is signed, this should be an arithmetic shift */ > > @@ - > > 115,7 +118,8 @@ rte_distributor_poll_pkt_v1705(struct rte_distributor *d, > > * mbuf pointers, so toggle the bit so scheduler can start working > > * on the next cacheline while we're working. > > */ > > - buf->bufptr64[0] |= RTE_DISTRIB_GET_BUF; > > + __atomic_store_n(&(buf->bufptr64[0]), > > + buf->bufptr64[0] | RTE_DISTRIB_GET_BUF, > > __ATOMIC_RELEASE); > > > > return count; > > } > > @@ -174,6 +178,7 @@ rte_distributor_return_pkt_v1705(struct > > rte_distributor *d, > > return -EINVAL; > > } > > > > + __atomic_thread_fence(__ATOMIC_ACQUIRE); > > for (i = 0; i < RTE_DIST_BURST_SIZE; i++) > > /* Switch off the return bit first */ > > buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF; @@ -183,7 > > +188,8 @@ rte_distributor_return_pkt_v1705(struct rte_distributor *d, > > RTE_DISTRIB_FLAG_BITS) | > > RTE_DISTRIB_RETURN_BUF; > > > > /* set the GET_BUF but even if we got no returns */ > > - buf->retptr64[0] |= RTE_DISTRIB_GET_BUF; > > + __atomic_store_n(&(buf->retptr64[0]), > > + buf->retptr64[0] | RTE_DISTRIB_GET_BUF, > > __ATOMIC_RELEASE); > > > > return 0; > > } > > @@ -273,7 +279,8 @@ handle_returns(struct rte_distributor *d, unsigned > > int wkr) > > unsigned int count = 0; > > unsigned int i; > > > > - if (buf->retptr64[0] & RTE_DISTRIB_GET_BUF) { > > + if (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE) > > + & RTE_DISTRIB_GET_BUF) { > > for (i = 0; i < RTE_DIST_BURST_SIZE; i++) { > > if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) { > > oldbuf = ((uintptr_t)(buf->retptr64[i] >> @@ > > -287,7 +294,7 @@ handle_returns(struct rte_distributor *d, unsigned int > wkr) > > d->returns.start = ret_start; > > d->returns.count = ret_count; > > /* Clear for the worker to populate with more returns */ > > - buf->retptr64[0] = 0; > > + __atomic_store_n(&(buf->retptr64[0]), 0, > > __ATOMIC_RELEASE); > > } > > return count; > > } > > @@ -307,7 +314,8 @@ release(struct rte_distributor *d, unsigned int wkr) > > struct rte_distributor_buffer *buf = &(d->bufs[wkr]); > > unsigned int i; > > > > - while (!(d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF)) > > + while (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]), > > __ATOMIC_ACQUIRE) > > + & RTE_DISTRIB_GET_BUF)) > > rte_pause(); > > > > handle_returns(d, wkr); > > @@ -328,7 +336,8 @@ release(struct rte_distributor *d, unsigned int wkr) > > d->backlog[wkr].count = 0; > > > > /* Clear the GET bit */ > > - buf->bufptr64[0] &= ~RTE_DISTRIB_GET_BUF; > > + __atomic_store_n(&(buf->bufptr64[0]), > > + buf->bufptr64[0] & ~RTE_DISTRIB_GET_BUF, > > __ATOMIC_RELEASE); > > return buf->count; > > > > } > > @@ -355,7 +364,8 @@ rte_distributor_process_v1705(struct > > rte_distributor *d, > > if (unlikely(num_mbufs == 0)) { > > /* Flush out all non-full cache-lines to workers. */ > > for (wid = 0 ; wid < d->num_workers; wid++) { > > - if (d->bufs[wid].bufptr64[0] & > RTE_DISTRIB_GET_BUF) > > { > > + if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]), > > + __ATOMIC_ACQUIRE) & > > RTE_DISTRIB_GET_BUF) { > > release(d, wid); > > handle_returns(d, wid); > > } > > @@ -367,7 +377,8 @@ rte_distributor_process_v1705(struct > > rte_distributor *d, > > uint16_t matches[RTE_DIST_BURST_SIZE]; > > unsigned int pkts; > > > > - if (d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF) > > + if (__atomic_load_n(&(d->bufs[wkr].bufptr64[0]), > > + __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF) > > d->bufs[wkr].count = 0; > > > > if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE) @@ - > > 465,7 +476,8 @@ rte_distributor_process_v1705(struct rte_distributor > > *d, > > > > /* Flush out all non-full cache-lines to workers. */ > > for (wid = 0 ; wid < d->num_workers; wid++) > > - if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF)) > > + if ((__atomic_load_n(&(d->bufs[wid].bufptr64[0]), > > + __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)) > > release(d, wid); > > > > return num_mbufs; > > @@ -574,7 +586,8 @@ rte_distributor_clear_returns_v1705(struct > > rte_distributor *d) > > > > /* throw away returns, so workers can exit */ > > for (wkr = 0; wkr < d->num_workers; wkr++) > > - d->bufs[wkr].retptr64[0] = 0; > > + __atomic_store_n(&(d->bufs[wkr].retptr64[0]), 0, > > + __ATOMIC_RELEASE); > > } > > BIND_DEFAULT_SYMBOL(rte_distributor_clear_returns, _v1705, 17.05); > > MAP_STATIC_SYMBOL(void rte_distributor_clear_returns(struct > > rte_distributor *d), diff --git > > a/lib/librte_distributor/rte_distributor_v20.c > > b/lib/librte_distributor/rte_distributor_v20.c > > index cdc0969a8..41411e3c1 100644 > > --- a/lib/librte_distributor/rte_distributor_v20.c > > +++ b/lib/librte_distributor/rte_distributor_v20.c > > @@ -34,9 +34,10 @@ rte_distributor_request_pkt_v20(struct > > rte_distributor_v20 *d, > > union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id]; > > int64_t req = (((int64_t)(uintptr_t)oldpkt) << > RTE_DISTRIB_FLAG_BITS) > > | RTE_DISTRIB_GET_BUF; > > - while (unlikely(buf->bufptr64 & RTE_DISTRIB_FLAGS_MASK)) > > + while (unlikely(__atomic_load_n(&(buf->bufptr64), > > __ATOMIC_ACQUIRE) > > + & RTE_DISTRIB_FLAGS_MASK)) > > rte_pause(); > > - buf->bufptr64 = req; > > + __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE); > > } > > VERSION_SYMBOL(rte_distributor_request_pkt, _v20, 2.0); > > > > @@ -45,7 +46,8 @@ rte_distributor_poll_pkt_v20(struct > > rte_distributor_v20 *d, > > unsigned worker_id) > > { > > union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id]; > > - if (buf->bufptr64 & RTE_DISTRIB_GET_BUF) > > + if (__atomic_load_n(&(buf->bufptr64), __ATOMIC_ACQUIRE) > > + & RTE_DISTRIB_GET_BUF) > > return NULL; > > > > /* since bufptr64 is signed, this should be an arithmetic shift */ > > @@ - > > 73,7 +75,7 @@ rte_distributor_return_pkt_v20(struct rte_distributor_v20 > *d, > > union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id]; > > uint64_t req = (((int64_t)(uintptr_t)oldpkt) << > > RTE_DISTRIB_FLAG_BITS) > > | RTE_DISTRIB_RETURN_BUF; > > - buf->bufptr64 = req; > > + __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE); > > return 0; > > } > > VERSION_SYMBOL(rte_distributor_return_pkt, _v20, 2.0); @@ -117,7 > > +119,7 @@ handle_worker_shutdown(struct rte_distributor_v20 *d, > > unsigned int > > wkr) { > > d->in_flight_tags[wkr] = 0; > > d->in_flight_bitmask &= ~(1UL << wkr); > > - d->bufs[wkr].bufptr64 = 0; > > + __atomic_store_n(&(d->bufs[wkr].bufptr64), 0, > __ATOMIC_RELEASE); > > if (unlikely(d->backlog[wkr].count != 0)) { > > /* On return of a packet, we need to move the > > * queued packets for this core elsewhere. > > @@ -165,18 +167,23 @@ process_returns(struct rte_distributor_v20 *d) > > const int64_t data = d->bufs[wkr].bufptr64; > > uintptr_t oldbuf = 0; > > > > - if (data & RTE_DISTRIB_GET_BUF) { > > + if (__atomic_load_n(&data, __ATOMIC_ACQUIRE) > > + & RTE_DISTRIB_GET_BUF) { > > flushed++; > > if (d->backlog[wkr].count) > > - d->bufs[wkr].bufptr64 = > > - backlog_pop(&d- > > >backlog[wkr]); > > + __atomic_store_n(&(d->bufs[wkr].bufptr64), > > + backlog_pop(&d->backlog[wkr]), > > + __ATOMIC_RELEASE); > > else { > > - d->bufs[wkr].bufptr64 = > > RTE_DISTRIB_GET_BUF; > > + __atomic_store_n(&(d->bufs[wkr].bufptr64), > > + RTE_DISTRIB_GET_BUF, > > + __ATOMIC_RELEASE); > > d->in_flight_tags[wkr] = 0; > > d->in_flight_bitmask &= ~(1UL << wkr); > > } > > oldbuf = data >> RTE_DISTRIB_FLAG_BITS; > > - } else if (data & RTE_DISTRIB_RETURN_BUF) { > > + } else if (__atomic_load_n(&data, __ATOMIC_ACQUIRE) > > + & RTE_DISTRIB_RETURN_BUF) { > > handle_worker_shutdown(d, wkr); > > oldbuf = data >> RTE_DISTRIB_FLAG_BITS; > > } > > @@ -251,21 +258,26 @@ rte_distributor_process_v20(struct > > rte_distributor_v20 *d, > > } > > } > > > > - if ((data & RTE_DISTRIB_GET_BUF) && > > + if ((__atomic_load_n(&data, __ATOMIC_ACQUIRE) > > + & RTE_DISTRIB_GET_BUF) && > > (d->backlog[wkr].count || next_mb)) { > > > > if (d->backlog[wkr].count) > > - d->bufs[wkr].bufptr64 = > > - backlog_pop(&d- > > >backlog[wkr]); > > + __atomic_store_n(&(d->bufs[wkr].bufptr64), > > + backlog_pop(&d- > > >backlog[wkr]), > > + __ATOMIC_RELEASE); > > > > else { > > - d->bufs[wkr].bufptr64 = next_value; > > + __atomic_store_n(&(d->bufs[wkr].bufptr64), > > + next_value, > > + __ATOMIC_RELEASE); > > d->in_flight_tags[wkr] = new_tag; > > d->in_flight_bitmask |= (1UL << wkr); > > next_mb = NULL; > > } > > oldbuf = data >> RTE_DISTRIB_FLAG_BITS; > > - } else if (data & RTE_DISTRIB_RETURN_BUF) { > > + } else if (__atomic_load_n(&data, __ATOMIC_ACQUIRE) > > + & RTE_DISTRIB_RETURN_BUF) { > > handle_worker_shutdown(d, wkr); > > oldbuf = data >> RTE_DISTRIB_FLAG_BITS; > > } > > @@ -280,13 +292,16 @@ rte_distributor_process_v20(struct > > rte_distributor_v20 *d, > > * if they are ready */ > > for (wkr = 0; wkr < d->num_workers; wkr++) > > if (d->backlog[wkr].count && > > - (d->bufs[wkr].bufptr64 & > > RTE_DISTRIB_GET_BUF)) { > > + (__atomic_load_n(&(d->bufs[wkr].bufptr64), > > + __ATOMIC_ACQUIRE) & > > RTE_DISTRIB_GET_BUF)) { > > > > int64_t oldbuf = d->bufs[wkr].bufptr64 >> > > RTE_DISTRIB_FLAG_BITS; > > store_return(oldbuf, d, &ret_start, &ret_count); > > > > - d->bufs[wkr].bufptr64 = backlog_pop(&d- > > >backlog[wkr]); > > + __atomic_store_n(&(d->bufs[wkr].bufptr64), > > + backlog_pop(&d->backlog[wkr]), > > + __ATOMIC_RELEASE); > > } > > > > d->returns.start = ret_start; > > -- > > 2.17.1