Hi Jakub,
        I am trying to review this patch. I am having difficulty in 
understanding the implementation for the queue/ring, appreciate if you could 
help me understand the logic.

1) The S2M queues - are used to send packets from slave to master. My 
understanding is that, the slave thread would call 'eth_memif_tx_zc' and the 
master thread would call 'eth_memif_rx_zc'. Is this correct?
2) The M2S queues - are used to send packets from master to slave. Here the 
slave thread would call 'eth_memif_rx_zc' and the master thread would call 
'eth_memif_tx_zc'. Is this correct?

Thank you,
Honnappa

> -----Original Message-----
> From: Phil Yang <phil.y...@arm.com>
> Sent: Friday, September 11, 2020 12:38 AM
> To: jgraj...@cisco.com; dev@dpdk.org
> Cc: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>; Ruifeng Wang
> <ruifeng.w...@arm.com>; nd <n...@arm.com>
> Subject: [PATCH] net/memif: relax barrier for zero copy path
> 
> Using 'rte_mb' to synchronize the shared ring head/tail between producer
> and consumer will stall the pipeline and damage performance on the weak
> memory model platforms, such like aarch64.
> 
> Relax the expensive barrier with c11 atomic with explicit memory ordering
> can improve 3.6% performance on throughput.
> 
> Signed-off-by: Phil Yang <phil.y...@arm.com>
> Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com>
> ---
>  drivers/net/memif/rte_eth_memif.c | 35 +++++++++++++++++++++++++------
> ----
>  1 file changed, 25 insertions(+), 10 deletions(-)
> 
> diff --git a/drivers/net/memif/rte_eth_memif.c
> b/drivers/net/memif/rte_eth_memif.c
> index c1c7e9f..a19c0f3 100644
> --- a/drivers/net/memif/rte_eth_memif.c
> +++ b/drivers/net/memif/rte_eth_memif.c
> @@ -253,7 +253,12 @@ memif_free_stored_mbufs(struct
> pmd_process_private *proc_private, struct memif_q
>       memif_ring_t *ring = memif_get_ring_from_queue(proc_private,
> mq);
> 
>       /* FIXME: improve performance */
> -     while (mq->last_tail != ring->tail) {
> +     /* The ring->tail acts as a guard variable between Tx and Rx
> +      * threads, so using load-acquire pairs with store-release
> +      * to synchronize it between threads.
> +      */
> +     while (mq->last_tail != __atomic_load_n(&ring->tail,
> +                                             __ATOMIC_ACQUIRE)) {
>               RTE_MBUF_PREFETCH_TO_FREE(mq->buffers[(mq->last_tail
> + 1) & mask]);
>               /* Decrement refcnt and free mbuf. (current segment) */
>               rte_mbuf_refcnt_update(mq->buffers[mq->last_tail & mask],
> -1); @@ -455,7 +460,11 @@ eth_memif_rx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>       mask = ring_size - 1;
> 
>       cur_slot = mq->last_tail;
> -     last_slot = ring->tail;
> +     /* The ring->tail acts as a guard variable between Tx and Rx
> +      * threads, so using load-acquire pairs with store-release
> +      * to synchronize it between threads.
> +      */
> +     last_slot = __atomic_load_n(&ring->tail, __ATOMIC_ACQUIRE);
>       if (cur_slot == last_slot)
>               goto refill;
>       n_slots = last_slot - cur_slot;
> @@ -501,7 +510,11 @@ eth_memif_rx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
> 
>  /* Supply master with new buffers */
>  refill:
> -     head = ring->head;
> +     /* The ring->head acts as a guard variable between Tx and Rx
> +      * threads, so using load-acquire pairs with store-release
> +      * to synchronize it between threads.
> +      */
> +     head = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE);
>       n_slots = ring_size - head + mq->last_tail;
> 
>       if (n_slots < 32)
> @@ -526,8 +539,7 @@ eth_memif_rx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>                       (uint8_t *)proc_private->regions[d0->region]->addr;
>       }
>  no_free_mbufs:
> -     rte_mb();
> -     ring->head = head;
> +     __atomic_store_n(&ring->head, head, __ATOMIC_RELEASE);
> 
>       mq->n_pkts += n_rx_pkts;
> 
> @@ -723,8 +735,12 @@ eth_memif_tx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>       memif_free_stored_mbufs(proc_private, mq);
> 
>       /* ring type always MEMIF_RING_S2M */
> -     slot = ring->head;
> -     n_free = ring_size - ring->head + mq->last_tail;
> +     /* The ring->head acts as a guard variable between Tx and Rx
> +      * threads, so using load-acquire pairs with store-release
> +      * to synchronize it between threads.
> +      */
> +     slot = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE);
> +     n_free = ring_size - slot + mq->last_tail;
> 
>       int used_slots;
> 
> @@ -778,12 +794,11 @@ eth_memif_tx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>       }
> 
>  no_free_slots:
> -     rte_mb();
>       /* update ring pointers */
>       if (type == MEMIF_RING_S2M)
> -             ring->head = slot;
> +             __atomic_store_n(&ring->head, slot, __ATOMIC_RELEASE);
>       else
> -             ring->tail = slot;
> +             __atomic_store_n(&ring->tail, slot, __ATOMIC_RELEASE);
> 
>       /* Send interrupt, if enabled. */
>       if ((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) {
> --
> 2.7.4

Reply via email to