Hi Jakub, I am trying to review this patch. I am having difficulty in understanding the implementation for the queue/ring, appreciate if you could help me understand the logic.
1) The S2M queues - are used to send packets from slave to master. My understanding is that, the slave thread would call 'eth_memif_tx_zc' and the master thread would call 'eth_memif_rx_zc'. Is this correct? 2) The M2S queues - are used to send packets from master to slave. Here the slave thread would call 'eth_memif_rx_zc' and the master thread would call 'eth_memif_tx_zc'. Is this correct? Thank you, Honnappa > -----Original Message----- > From: Phil Yang <phil.y...@arm.com> > Sent: Friday, September 11, 2020 12:38 AM > To: jgraj...@cisco.com; dev@dpdk.org > Cc: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>; Ruifeng Wang > <ruifeng.w...@arm.com>; nd <n...@arm.com> > Subject: [PATCH] net/memif: relax barrier for zero copy path > > Using 'rte_mb' to synchronize the shared ring head/tail between producer > and consumer will stall the pipeline and damage performance on the weak > memory model platforms, such like aarch64. > > Relax the expensive barrier with c11 atomic with explicit memory ordering > can improve 3.6% performance on throughput. > > Signed-off-by: Phil Yang <phil.y...@arm.com> > Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com> > --- > drivers/net/memif/rte_eth_memif.c | 35 +++++++++++++++++++++++++------ > ---- > 1 file changed, 25 insertions(+), 10 deletions(-) > > diff --git a/drivers/net/memif/rte_eth_memif.c > b/drivers/net/memif/rte_eth_memif.c > index c1c7e9f..a19c0f3 100644 > --- a/drivers/net/memif/rte_eth_memif.c > +++ b/drivers/net/memif/rte_eth_memif.c > @@ -253,7 +253,12 @@ memif_free_stored_mbufs(struct > pmd_process_private *proc_private, struct memif_q > memif_ring_t *ring = memif_get_ring_from_queue(proc_private, > mq); > > /* FIXME: improve performance */ > - while (mq->last_tail != ring->tail) { > + /* The ring->tail acts as a guard variable between Tx and Rx > + * threads, so using load-acquire pairs with store-release > + * to synchronize it between threads. > + */ > + while (mq->last_tail != __atomic_load_n(&ring->tail, > + __ATOMIC_ACQUIRE)) { > RTE_MBUF_PREFETCH_TO_FREE(mq->buffers[(mq->last_tail > + 1) & mask]); > /* Decrement refcnt and free mbuf. (current segment) */ > rte_mbuf_refcnt_update(mq->buffers[mq->last_tail & mask], > -1); @@ -455,7 +460,11 @@ eth_memif_rx_zc(void *queue, struct rte_mbuf > **bufs, uint16_t nb_pkts) > mask = ring_size - 1; > > cur_slot = mq->last_tail; > - last_slot = ring->tail; > + /* The ring->tail acts as a guard variable between Tx and Rx > + * threads, so using load-acquire pairs with store-release > + * to synchronize it between threads. > + */ > + last_slot = __atomic_load_n(&ring->tail, __ATOMIC_ACQUIRE); > if (cur_slot == last_slot) > goto refill; > n_slots = last_slot - cur_slot; > @@ -501,7 +510,11 @@ eth_memif_rx_zc(void *queue, struct rte_mbuf > **bufs, uint16_t nb_pkts) > > /* Supply master with new buffers */ > refill: > - head = ring->head; > + /* The ring->head acts as a guard variable between Tx and Rx > + * threads, so using load-acquire pairs with store-release > + * to synchronize it between threads. > + */ > + head = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE); > n_slots = ring_size - head + mq->last_tail; > > if (n_slots < 32) > @@ -526,8 +539,7 @@ eth_memif_rx_zc(void *queue, struct rte_mbuf > **bufs, uint16_t nb_pkts) > (uint8_t *)proc_private->regions[d0->region]->addr; > } > no_free_mbufs: > - rte_mb(); > - ring->head = head; > + __atomic_store_n(&ring->head, head, __ATOMIC_RELEASE); > > mq->n_pkts += n_rx_pkts; > > @@ -723,8 +735,12 @@ eth_memif_tx_zc(void *queue, struct rte_mbuf > **bufs, uint16_t nb_pkts) > memif_free_stored_mbufs(proc_private, mq); > > /* ring type always MEMIF_RING_S2M */ > - slot = ring->head; > - n_free = ring_size - ring->head + mq->last_tail; > + /* The ring->head acts as a guard variable between Tx and Rx > + * threads, so using load-acquire pairs with store-release > + * to synchronize it between threads. > + */ > + slot = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE); > + n_free = ring_size - slot + mq->last_tail; > > int used_slots; > > @@ -778,12 +794,11 @@ eth_memif_tx_zc(void *queue, struct rte_mbuf > **bufs, uint16_t nb_pkts) > } > > no_free_slots: > - rte_mb(); > /* update ring pointers */ > if (type == MEMIF_RING_S2M) > - ring->head = slot; > + __atomic_store_n(&ring->head, slot, __ATOMIC_RELEASE); > else > - ring->tail = slot; > + __atomic_store_n(&ring->tail, slot, __ATOMIC_RELEASE); > > /* Send interrupt, if enabled. */ > if ((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) { > -- > 2.7.4