Using 'rte_mb' to synchronize the shared ring head/tail between producer and consumer will stall the pipeline and damage performance on the weak memory model platforms, such like aarch64.
Relax the expensive barrier with c11 atomic with explicit memory ordering can improve 3.6% performance on throughput. Signed-off-by: Phil Yang <phil.y...@arm.com> Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com> --- drivers/net/memif/rte_eth_memif.c | 35 +++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/drivers/net/memif/rte_eth_memif.c b/drivers/net/memif/rte_eth_memif.c index c1c7e9f..a19c0f3 100644 --- a/drivers/net/memif/rte_eth_memif.c +++ b/drivers/net/memif/rte_eth_memif.c @@ -253,7 +253,12 @@ memif_free_stored_mbufs(struct pmd_process_private *proc_private, struct memif_q memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq); /* FIXME: improve performance */ - while (mq->last_tail != ring->tail) { + /* The ring->tail acts as a guard variable between Tx and Rx + * threads, so using load-acquire pairs with store-release + * to synchronize it between threads. + */ + while (mq->last_tail != __atomic_load_n(&ring->tail, + __ATOMIC_ACQUIRE)) { RTE_MBUF_PREFETCH_TO_FREE(mq->buffers[(mq->last_tail + 1) & mask]); /* Decrement refcnt and free mbuf. (current segment) */ rte_mbuf_refcnt_update(mq->buffers[mq->last_tail & mask], -1); @@ -455,7 +460,11 @@ eth_memif_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) mask = ring_size - 1; cur_slot = mq->last_tail; - last_slot = ring->tail; + /* The ring->tail acts as a guard variable between Tx and Rx + * threads, so using load-acquire pairs with store-release + * to synchronize it between threads. + */ + last_slot = __atomic_load_n(&ring->tail, __ATOMIC_ACQUIRE); if (cur_slot == last_slot) goto refill; n_slots = last_slot - cur_slot; @@ -501,7 +510,11 @@ eth_memif_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) /* Supply master with new buffers */ refill: - head = ring->head; + /* The ring->head acts as a guard variable between Tx and Rx + * threads, so using load-acquire pairs with store-release + * to synchronize it between threads. + */ + head = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE); n_slots = ring_size - head + mq->last_tail; if (n_slots < 32) @@ -526,8 +539,7 @@ eth_memif_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) (uint8_t *)proc_private->regions[d0->region]->addr; } no_free_mbufs: - rte_mb(); - ring->head = head; + __atomic_store_n(&ring->head, head, __ATOMIC_RELEASE); mq->n_pkts += n_rx_pkts; @@ -723,8 +735,12 @@ eth_memif_tx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) memif_free_stored_mbufs(proc_private, mq); /* ring type always MEMIF_RING_S2M */ - slot = ring->head; - n_free = ring_size - ring->head + mq->last_tail; + /* The ring->head acts as a guard variable between Tx and Rx + * threads, so using load-acquire pairs with store-release + * to synchronize it between threads. + */ + slot = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE); + n_free = ring_size - slot + mq->last_tail; int used_slots; @@ -778,12 +794,11 @@ eth_memif_tx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) } no_free_slots: - rte_mb(); /* update ring pointers */ if (type == MEMIF_RING_S2M) - ring->head = slot; + __atomic_store_n(&ring->head, slot, __ATOMIC_RELEASE); else - ring->tail = slot; + __atomic_store_n(&ring->tail, slot, __ATOMIC_RELEASE); /* Send interrupt, if enabled. */ if ((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) { -- 2.7.4