> -----Original Message-----
> From: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
> Sent: Sunday, October 13, 2019 10:32
> To: Ruifeng Wang (Arm Technology China) <ruifeng.w...@arm.com>;
> david.h...@intel.com
> Cc: dev@dpdk.org; hka...@marvell.com; Gavin Hu (Arm Technology China)
> <gavin...@arm.com>; nd <n...@arm.com>; Ruifeng Wang (Arm Technology
> China) <ruifeng.w...@arm.com>; sta...@dpdk.org; Honnappa Nagarahalli
> <honnappa.nagaraha...@arm.com>; nd <n...@arm.com>
> Subject: RE: [PATCH v2 1/2] lib/distributor: fix deadlock issue for aarch64
> 
> Hi Ruifeng,
>       Typically, we have followed the convention of adding comments
> whenever C11 atomic APIs are used. Can you please add comments
> indicating why acquire or release semantics are used?
> 
OK. Comments will be added to explain acquire/release semantics used.

> > -----Original Message-----
> > From: Ruifeng Wang <ruifeng.w...@arm.com>
> > Sent: Friday, October 11, 2019 9:44 PM
> > To: david.h...@intel.com
> > Cc: dev@dpdk.org; hka...@marvell.com; Gavin Hu (Arm Technology China)
> > <gavin...@arm.com>; Honnappa Nagarahalli
> > <honnappa.nagaraha...@arm.com>; nd <n...@arm.com>; Ruifeng Wang
> (Arm
> > Technology China) <ruifeng.w...@arm.com>; sta...@dpdk.org
> > Subject: [PATCH v2 1/2] lib/distributor: fix deadlock issue for
> > aarch64
> >
> > Distributor and worker threads rely on data structs in cache line for
> > synchronization. The shared data structs were not protected.
> > This caused deadlock issue on weaker memory ordering platforms as
> aarch64.
> > Fix this issue by adding memory barriers to ensure synchronization
> > among cores.
> >
> > Bugzilla ID: 342
> > Fixes: 775003ad2f96 ("distributor: add new burst-capable library")
> > Cc: sta...@dpdk.org
> >
> > Signed-off-by: Ruifeng Wang <ruifeng.w...@arm.com>
> > Reviewed-by: Gavin Hu <gavin...@arm.com>
> > ---
> >  lib/librte_distributor/meson.build           |  5 ++
> >  lib/librte_distributor/rte_distributor.c     | 39 ++++++++++------
> >  lib/librte_distributor/rte_distributor_v20.c | 49
> > +++++++++++++-------
> >  3 files changed, 63 insertions(+), 30 deletions(-)
> >
> > diff --git a/lib/librte_distributor/meson.build
> > b/lib/librte_distributor/meson.build
> > index dba7e3b2a..26577dbc1 100644
> > --- a/lib/librte_distributor/meson.build
> > +++ b/lib/librte_distributor/meson.build
> > @@ -9,3 +9,8 @@ else
> >  endif
> >  headers = files('rte_distributor.h')
> >  deps += ['mbuf']
> > +
> > +# for clang 32-bit compiles we need libatomic for 64-bit atomic ops
> > +if
> > +cc.get_id() == 'clang' and dpdk_conf.get('RTE_ARCH_64') == false
> > +   ext_deps += cc.find_library('atomic') endif
> > diff --git a/lib/librte_distributor/rte_distributor.c
> > b/lib/librte_distributor/rte_distributor.c
> > index 21eb1fb0a..b653146d0 100644
> > --- a/lib/librte_distributor/rte_distributor.c
> > +++ b/lib/librte_distributor/rte_distributor.c
> > @@ -50,7 +50,8 @@ rte_distributor_request_pkt_v1705(struct
> > rte_distributor *d,
> >
> >     retptr64 = &(buf->retptr64[0]);
> >     /* Spin while handshake bits are set (scheduler clears it) */
> > -   while (unlikely(*retptr64 & RTE_DISTRIB_GET_BUF)) {
> > +   while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE)
> > +                   & RTE_DISTRIB_GET_BUF)) {
> >             rte_pause();
> >             uint64_t t = rte_rdtsc()+100;
> >
> > @@ -76,7 +77,8 @@ rte_distributor_request_pkt_v1705(struct
> > rte_distributor *d,
> >      * Finally, set the GET_BUF  to signal to distributor that cache
> >      * line is ready for processing
> >      */
> > -   *retptr64 |= RTE_DISTRIB_GET_BUF;
> > +   __atomic_store_n(retptr64, *retptr64 | RTE_DISTRIB_GET_BUF,
> > +                   __ATOMIC_RELEASE);
> >  }
> >  BIND_DEFAULT_SYMBOL(rte_distributor_request_pkt, _v1705, 17.05);
> > MAP_STATIC_SYMBOL(void rte_distributor_request_pkt(struct
> > rte_distributor *d, @@ -99,7 +101,8 @@
> > rte_distributor_poll_pkt_v1705(struct
> > rte_distributor *d,
> >     }
> >
> >     /* If bit is set, return */
> > -   if (buf->bufptr64[0] & RTE_DISTRIB_GET_BUF)
> > +   if (__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
> > +           & RTE_DISTRIB_GET_BUF)
> >             return -1;
> >
> >     /* since bufptr64 is signed, this should be an arithmetic shift */
> > @@ -
> > 115,7 +118,8 @@ rte_distributor_poll_pkt_v1705(struct rte_distributor *d,
> >      * mbuf pointers, so toggle the bit so scheduler can start working
> >      * on the next cacheline while we're working.
> >      */
> > -   buf->bufptr64[0] |= RTE_DISTRIB_GET_BUF;
> > +   __atomic_store_n(&(buf->bufptr64[0]),
> > +           buf->bufptr64[0] | RTE_DISTRIB_GET_BUF,
> > __ATOMIC_RELEASE);
> >
> >     return count;
> >  }
> > @@ -174,6 +178,7 @@ rte_distributor_return_pkt_v1705(struct
> > rte_distributor *d,
> >                     return -EINVAL;
> >     }
> >
> > +   __atomic_thread_fence(__ATOMIC_ACQUIRE);
> >     for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
> >             /* Switch off the return bit first */
> >             buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF; @@ -183,7
> > +188,8 @@ rte_distributor_return_pkt_v1705(struct rte_distributor *d,
> >                     RTE_DISTRIB_FLAG_BITS) |
> > RTE_DISTRIB_RETURN_BUF;
> >
> >     /* set the GET_BUF but even if we got no returns */
> > -   buf->retptr64[0] |= RTE_DISTRIB_GET_BUF;
> > +   __atomic_store_n(&(buf->retptr64[0]),
> > +           buf->retptr64[0] | RTE_DISTRIB_GET_BUF,
> > __ATOMIC_RELEASE);
> >
> >     return 0;
> >  }
> > @@ -273,7 +279,8 @@ handle_returns(struct rte_distributor *d, unsigned
> > int wkr)
> >     unsigned int count = 0;
> >     unsigned int i;
> >
> > -   if (buf->retptr64[0] & RTE_DISTRIB_GET_BUF) {
> > +   if (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE)
> > +           & RTE_DISTRIB_GET_BUF) {
> >             for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
> >                     if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) {
> >                             oldbuf = ((uintptr_t)(buf->retptr64[i] >> @@
> > -287,7 +294,7 @@ handle_returns(struct rte_distributor *d, unsigned int
> wkr)
> >             d->returns.start = ret_start;
> >             d->returns.count = ret_count;
> >             /* Clear for the worker to populate with more returns */
> > -           buf->retptr64[0] = 0;
> > +           __atomic_store_n(&(buf->retptr64[0]), 0,
> > __ATOMIC_RELEASE);
> >     }
> >     return count;
> >  }
> > @@ -307,7 +314,8 @@ release(struct rte_distributor *d, unsigned int wkr)
> >     struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
> >     unsigned int i;
> >
> > -   while (!(d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF))
> > +   while (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]),
> > __ATOMIC_ACQUIRE)
> > +           & RTE_DISTRIB_GET_BUF))
> >             rte_pause();
> >
> >     handle_returns(d, wkr);
> > @@ -328,7 +336,8 @@ release(struct rte_distributor *d, unsigned int wkr)
> >     d->backlog[wkr].count = 0;
> >
> >     /* Clear the GET bit */
> > -   buf->bufptr64[0] &= ~RTE_DISTRIB_GET_BUF;
> > +   __atomic_store_n(&(buf->bufptr64[0]),
> > +           buf->bufptr64[0] & ~RTE_DISTRIB_GET_BUF,
> > __ATOMIC_RELEASE);
> >     return  buf->count;
> >
> >  }
> > @@ -355,7 +364,8 @@ rte_distributor_process_v1705(struct
> > rte_distributor *d,
> >     if (unlikely(num_mbufs == 0)) {
> >             /* Flush out all non-full cache-lines to workers. */
> >             for (wid = 0 ; wid < d->num_workers; wid++) {
> > -                   if (d->bufs[wid].bufptr64[0] &
> RTE_DISTRIB_GET_BUF)
> > {
> > +                   if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
> > +                           __ATOMIC_ACQUIRE) &
> > RTE_DISTRIB_GET_BUF) {
> >                             release(d, wid);
> >                             handle_returns(d, wid);
> >                     }
> > @@ -367,7 +377,8 @@ rte_distributor_process_v1705(struct
> > rte_distributor *d,
> >             uint16_t matches[RTE_DIST_BURST_SIZE];
> >             unsigned int pkts;
> >
> > -           if (d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF)
> > +           if (__atomic_load_n(&(d->bufs[wkr].bufptr64[0]),
> > +                   __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)
> >                     d->bufs[wkr].count = 0;
> >
> >             if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE) @@ -
> > 465,7 +476,8 @@ rte_distributor_process_v1705(struct rte_distributor
> > *d,
> >
> >     /* Flush out all non-full cache-lines to workers. */
> >     for (wid = 0 ; wid < d->num_workers; wid++)
> > -           if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF))
> > +           if ((__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
> > +                   __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF))
> >                     release(d, wid);
> >
> >     return num_mbufs;
> > @@ -574,7 +586,8 @@ rte_distributor_clear_returns_v1705(struct
> > rte_distributor *d)
> >
> >     /* throw away returns, so workers can exit */
> >     for (wkr = 0; wkr < d->num_workers; wkr++)
> > -           d->bufs[wkr].retptr64[0] = 0;
> > +           __atomic_store_n(&(d->bufs[wkr].retptr64[0]), 0,
> > +                           __ATOMIC_RELEASE);
> >  }
> >  BIND_DEFAULT_SYMBOL(rte_distributor_clear_returns, _v1705, 17.05);
> > MAP_STATIC_SYMBOL(void rte_distributor_clear_returns(struct
> > rte_distributor *d), diff --git
> > a/lib/librte_distributor/rte_distributor_v20.c
> > b/lib/librte_distributor/rte_distributor_v20.c
> > index cdc0969a8..41411e3c1 100644
> > --- a/lib/librte_distributor/rte_distributor_v20.c
> > +++ b/lib/librte_distributor/rte_distributor_v20.c
> > @@ -34,9 +34,10 @@ rte_distributor_request_pkt_v20(struct
> > rte_distributor_v20 *d,
> >     union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
> >     int64_t req = (((int64_t)(uintptr_t)oldpkt) <<
> RTE_DISTRIB_FLAG_BITS)
> >                     | RTE_DISTRIB_GET_BUF;
> > -   while (unlikely(buf->bufptr64 & RTE_DISTRIB_FLAGS_MASK))
> > +   while (unlikely(__atomic_load_n(&(buf->bufptr64),
> > __ATOMIC_ACQUIRE)
> > +           & RTE_DISTRIB_FLAGS_MASK))
> >             rte_pause();
> > -   buf->bufptr64 = req;
> > +   __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE);
> >  }
> >  VERSION_SYMBOL(rte_distributor_request_pkt, _v20, 2.0);
> >
> > @@ -45,7 +46,8 @@ rte_distributor_poll_pkt_v20(struct
> > rte_distributor_v20 *d,
> >             unsigned worker_id)
> >  {
> >     union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
> > -   if (buf->bufptr64 & RTE_DISTRIB_GET_BUF)
> > +   if (__atomic_load_n(&(buf->bufptr64), __ATOMIC_ACQUIRE)
> > +           & RTE_DISTRIB_GET_BUF)
> >             return NULL;
> >
> >     /* since bufptr64 is signed, this should be an arithmetic shift */
> > @@ -
> > 73,7 +75,7 @@ rte_distributor_return_pkt_v20(struct rte_distributor_v20
> *d,
> >     union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
> >     uint64_t req = (((int64_t)(uintptr_t)oldpkt) <<
> > RTE_DISTRIB_FLAG_BITS)
> >                     | RTE_DISTRIB_RETURN_BUF;
> > -   buf->bufptr64 = req;
> > +   __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE);
> >     return 0;
> >  }
> >  VERSION_SYMBOL(rte_distributor_return_pkt, _v20, 2.0); @@ -117,7
> > +119,7 @@ handle_worker_shutdown(struct rte_distributor_v20 *d,
> > unsigned int
> > wkr)  {
> >     d->in_flight_tags[wkr] = 0;
> >     d->in_flight_bitmask &= ~(1UL << wkr);
> > -   d->bufs[wkr].bufptr64 = 0;
> > +   __atomic_store_n(&(d->bufs[wkr].bufptr64), 0,
> __ATOMIC_RELEASE);
> >     if (unlikely(d->backlog[wkr].count != 0)) {
> >             /* On return of a packet, we need to move the
> >              * queued packets for this core elsewhere.
> > @@ -165,18 +167,23 @@ process_returns(struct rte_distributor_v20 *d)
> >             const int64_t data = d->bufs[wkr].bufptr64;
> >             uintptr_t oldbuf = 0;
> >
> > -           if (data & RTE_DISTRIB_GET_BUF) {
> > +           if (__atomic_load_n(&data, __ATOMIC_ACQUIRE)
> > +                   & RTE_DISTRIB_GET_BUF) {
> >                     flushed++;
> >                     if (d->backlog[wkr].count)
> > -                           d->bufs[wkr].bufptr64 =
> > -                                           backlog_pop(&d-
> > >backlog[wkr]);
> > +                           __atomic_store_n(&(d->bufs[wkr].bufptr64),
> > +                                   backlog_pop(&d->backlog[wkr]),
> > +                                   __ATOMIC_RELEASE);
> >                     else {
> > -                           d->bufs[wkr].bufptr64 =
> > RTE_DISTRIB_GET_BUF;
> > +                           __atomic_store_n(&(d->bufs[wkr].bufptr64),
> > +                                   RTE_DISTRIB_GET_BUF,
> > +                                   __ATOMIC_RELEASE);
> >                             d->in_flight_tags[wkr] = 0;
> >                             d->in_flight_bitmask &= ~(1UL << wkr);
> >                     }
> >                     oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> > -           } else if (data & RTE_DISTRIB_RETURN_BUF) {
> > +           } else if (__atomic_load_n(&data, __ATOMIC_ACQUIRE)
> > +                   & RTE_DISTRIB_RETURN_BUF) {
> >                     handle_worker_shutdown(d, wkr);
> >                     oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> >             }
> > @@ -251,21 +258,26 @@ rte_distributor_process_v20(struct
> > rte_distributor_v20 *d,
> >                     }
> >             }
> >
> > -           if ((data & RTE_DISTRIB_GET_BUF) &&
> > +           if ((__atomic_load_n(&data, __ATOMIC_ACQUIRE)
> > +                   & RTE_DISTRIB_GET_BUF) &&
> >                             (d->backlog[wkr].count || next_mb)) {
> >
> >                     if (d->backlog[wkr].count)
> > -                           d->bufs[wkr].bufptr64 =
> > -                                           backlog_pop(&d-
> > >backlog[wkr]);
> > +                           __atomic_store_n(&(d->bufs[wkr].bufptr64),
> > +                                           backlog_pop(&d-
> > >backlog[wkr]),
> > +                                           __ATOMIC_RELEASE);
> >
> >                     else {
> > -                           d->bufs[wkr].bufptr64 = next_value;
> > +                           __atomic_store_n(&(d->bufs[wkr].bufptr64),
> > +                                           next_value,
> > +                                           __ATOMIC_RELEASE);
> >                             d->in_flight_tags[wkr] = new_tag;
> >                             d->in_flight_bitmask |= (1UL << wkr);
> >                             next_mb = NULL;
> >                     }
> >                     oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> > -           } else if (data & RTE_DISTRIB_RETURN_BUF) {
> > +           } else if (__atomic_load_n(&data, __ATOMIC_ACQUIRE)
> > +                   & RTE_DISTRIB_RETURN_BUF) {
> >                     handle_worker_shutdown(d, wkr);
> >                     oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> >             }
> > @@ -280,13 +292,16 @@ rte_distributor_process_v20(struct
> > rte_distributor_v20 *d,
> >      * if they are ready */
> >     for (wkr = 0; wkr < d->num_workers; wkr++)
> >             if (d->backlog[wkr].count &&
> > -                           (d->bufs[wkr].bufptr64 &
> > RTE_DISTRIB_GET_BUF)) {
> > +                           (__atomic_load_n(&(d->bufs[wkr].bufptr64),
> > +                           __ATOMIC_ACQUIRE) &
> > RTE_DISTRIB_GET_BUF)) {
> >
> >                     int64_t oldbuf = d->bufs[wkr].bufptr64 >>
> >                                     RTE_DISTRIB_FLAG_BITS;
> >                     store_return(oldbuf, d, &ret_start, &ret_count);
> >
> > -                   d->bufs[wkr].bufptr64 = backlog_pop(&d-
> > >backlog[wkr]);
> > +                   __atomic_store_n(&(d->bufs[wkr].bufptr64),
> > +                           backlog_pop(&d->backlog[wkr]),
> > +                           __ATOMIC_RELEASE);
> >             }
> >
> >     d->returns.start = ret_start;
> > --
> > 2.17.1

Reply via email to