Replace the use of gcc builtin __atomic_xxx intrinsics with corresponding rte_atomic_xxx optional stdatomic API
Signed-off-by: Tyler Retzlaff <roret...@linux.microsoft.com> --- lib/distributor/distributor_private.h | 4 +-- lib/distributor/rte_distributor.c | 54 +++++++++++++++++------------------ 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/lib/distributor/distributor_private.h b/lib/distributor/distributor_private.h index 2f29343..dfeb9b5 100644 --- a/lib/distributor/distributor_private.h +++ b/lib/distributor/distributor_private.h @@ -113,12 +113,12 @@ enum rte_distributor_match_function { * There is a separate cacheline for returns in the burst API. */ struct rte_distributor_buffer { - volatile int64_t bufptr64[RTE_DIST_BURST_SIZE] + volatile RTE_ATOMIC(int64_t) bufptr64[RTE_DIST_BURST_SIZE] __rte_cache_aligned; /* <= outgoing to worker */ int64_t pad1 __rte_cache_aligned; /* <= one cache line */ - volatile int64_t retptr64[RTE_DIST_BURST_SIZE] + volatile RTE_ATOMIC(int64_t) retptr64[RTE_DIST_BURST_SIZE] __rte_cache_aligned; /* <= incoming from worker */ int64_t pad2 __rte_cache_aligned; /* <= one cache line */ diff --git a/lib/distributor/rte_distributor.c b/lib/distributor/rte_distributor.c index 5ca80dd..2ecb95c 100644 --- a/lib/distributor/rte_distributor.c +++ b/lib/distributor/rte_distributor.c @@ -38,7 +38,7 @@ struct rte_distributor_buffer *buf = &(d->bufs[worker_id]); unsigned int i; - volatile int64_t *retptr64; + volatile RTE_ATOMIC(int64_t) *retptr64; if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) { rte_distributor_request_pkt_single(d->d_single, @@ -50,7 +50,7 @@ /* Spin while handshake bits are set (scheduler clears it). * Sync with worker on GET_BUF flag. */ - while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE) + while (unlikely(rte_atomic_load_explicit(retptr64, rte_memory_order_acquire) & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) { rte_pause(); uint64_t t = rte_rdtsc()+100; @@ -78,8 +78,8 @@ * line is ready for processing * Sync with distributor to release retptrs */ - __atomic_store_n(retptr64, *retptr64 | RTE_DISTRIB_GET_BUF, - __ATOMIC_RELEASE); + rte_atomic_store_explicit(retptr64, *retptr64 | RTE_DISTRIB_GET_BUF, + rte_memory_order_release); } int @@ -102,7 +102,7 @@ * RETURN_BUF is set when distributor must retrieve in-flight packets * Sync with distributor to acquire bufptrs */ - if (__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE) + if (rte_atomic_load_explicit(&(buf->bufptr64[0]), rte_memory_order_acquire) & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF)) return -1; @@ -120,8 +120,8 @@ * on the next cacheline while we're working. * Sync with distributor on GET_BUF flag. Release bufptrs. */ - __atomic_store_n(&(buf->bufptr64[0]), - buf->bufptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE); + rte_atomic_store_explicit(&(buf->bufptr64[0]), + buf->bufptr64[0] | RTE_DISTRIB_GET_BUF, rte_memory_order_release); return count; } @@ -177,7 +177,7 @@ /* Spin while handshake bits are set (scheduler clears it). * Sync with worker on GET_BUF flag. */ - while (unlikely(__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_RELAXED) + while (unlikely(rte_atomic_load_explicit(&(buf->retptr64[0]), rte_memory_order_relaxed) & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) { rte_pause(); uint64_t t = rte_rdtsc()+100; @@ -187,7 +187,7 @@ } /* Sync with distributor to acquire retptrs */ - __atomic_thread_fence(__ATOMIC_ACQUIRE); + __atomic_thread_fence(rte_memory_order_acquire); for (i = 0; i < RTE_DIST_BURST_SIZE; i++) /* Switch off the return bit first */ buf->retptr64[i] = 0; @@ -200,15 +200,15 @@ * we won't read any mbufs from there even if GET_BUF is set. * This allows distributor to retrieve in-flight already sent packets. */ - __atomic_fetch_or(&(buf->bufptr64[0]), RTE_DISTRIB_RETURN_BUF, - __ATOMIC_ACQ_REL); + rte_atomic_fetch_or_explicit(&(buf->bufptr64[0]), RTE_DISTRIB_RETURN_BUF, + rte_memory_order_acq_rel); /* set the RETURN_BUF on retptr64 even if we got no returns. * Sync with distributor on RETURN_BUF flag. Release retptrs. * Notify distributor that we don't request more packets any more. */ - __atomic_store_n(&(buf->retptr64[0]), - buf->retptr64[0] | RTE_DISTRIB_RETURN_BUF, __ATOMIC_RELEASE); + rte_atomic_store_explicit(&(buf->retptr64[0]), + buf->retptr64[0] | RTE_DISTRIB_RETURN_BUF, rte_memory_order_release); return 0; } @@ -297,7 +297,7 @@ * to worker which does not require new packets. * They must be retrieved and assigned to another worker. */ - if (!(__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE) + if (!(rte_atomic_load_explicit(&(buf->bufptr64[0]), rte_memory_order_acquire) & RTE_DISTRIB_GET_BUF)) for (i = 0; i < RTE_DIST_BURST_SIZE; i++) if (buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF) @@ -310,8 +310,8 @@ * with new packets if worker will make a new request. * - clear RETURN_BUF to unlock reads on worker side. */ - __atomic_store_n(&(buf->bufptr64[0]), RTE_DISTRIB_GET_BUF, - __ATOMIC_RELEASE); + rte_atomic_store_explicit(&(buf->bufptr64[0]), RTE_DISTRIB_GET_BUF, + rte_memory_order_release); /* Collect backlog packets from worker */ for (i = 0; i < d->backlog[wkr].count; i++) @@ -348,7 +348,7 @@ unsigned int i; /* Sync on GET_BUF flag. Acquire retptrs. */ - if (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE) + if (rte_atomic_load_explicit(&(buf->retptr64[0]), rte_memory_order_acquire) & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF)) { for (i = 0; i < RTE_DIST_BURST_SIZE; i++) { if (buf->retptr64[i] & RTE_DISTRIB_VALID_BUF) { @@ -379,7 +379,7 @@ /* Clear for the worker to populate with more returns. * Sync with distributor on GET_BUF flag. Release retptrs. */ - __atomic_store_n(&(buf->retptr64[0]), 0, __ATOMIC_RELEASE); + rte_atomic_store_explicit(&(buf->retptr64[0]), 0, rte_memory_order_release); } return count; } @@ -404,7 +404,7 @@ return 0; /* Sync with worker on GET_BUF flag */ - while (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]), __ATOMIC_ACQUIRE) + while (!(rte_atomic_load_explicit(&(d->bufs[wkr].bufptr64[0]), rte_memory_order_acquire) & RTE_DISTRIB_GET_BUF)) { handle_returns(d, wkr); if (unlikely(!d->active[wkr])) @@ -430,8 +430,8 @@ /* Clear the GET bit. * Sync with worker on GET_BUF flag. Release bufptrs. */ - __atomic_store_n(&(buf->bufptr64[0]), - buf->bufptr64[0] & ~RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE); + rte_atomic_store_explicit(&(buf->bufptr64[0]), + buf->bufptr64[0] & ~RTE_DISTRIB_GET_BUF, rte_memory_order_release); return buf->count; } @@ -463,8 +463,8 @@ /* Flush out all non-full cache-lines to workers. */ for (wid = 0 ; wid < d->num_workers; wid++) { /* Sync with worker on GET_BUF flag. */ - if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]), - __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF) { + if (rte_atomic_load_explicit(&(d->bufs[wid].bufptr64[0]), + rte_memory_order_acquire) & RTE_DISTRIB_GET_BUF) { d->bufs[wid].count = 0; release(d, wid); handle_returns(d, wid); @@ -598,8 +598,8 @@ /* Flush out all non-full cache-lines to workers. */ for (wid = 0 ; wid < d->num_workers; wid++) /* Sync with worker on GET_BUF flag. */ - if ((__atomic_load_n(&(d->bufs[wid].bufptr64[0]), - __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)) { + if ((rte_atomic_load_explicit(&(d->bufs[wid].bufptr64[0]), + rte_memory_order_acquire) & RTE_DISTRIB_GET_BUF)) { d->bufs[wid].count = 0; release(d, wid); } @@ -700,8 +700,8 @@ /* throw away returns, so workers can exit */ for (wkr = 0; wkr < d->num_workers; wkr++) /* Sync with worker. Release retptrs. */ - __atomic_store_n(&(d->bufs[wkr].retptr64[0]), 0, - __ATOMIC_RELEASE); + rte_atomic_store_explicit(&(d->bufs[wkr].retptr64[0]), 0, + rte_memory_order_release); d->returns.start = d->returns.count = 0; } -- 1.8.3.1