In case VIRTIO_F_ORDER_PLATFORM(36) is not negotiated, then the frontend
and backend are assumed to be implemented in software, that is they can
run on identical CPUs in an SMP configuration. Thus a weak form of memory
barriers like rte_smp_r/wmb, other than rte_cio_r/wmb, is sufficient for
this case(vq->hw->weak_barriers == 1) and yields better performance.
For the above case, this patch helps yielding even better performance
by replacing the two-way barriers with C11 one-way barriers.

Meanwhile, a read barrier is required to ensure ordering between
descriptor's flags and content reads[1]. With C11, load-acquire can
enforce the ordering instead of rmb barrier.

[1]https://patchwork.dpdk.org/patch/49109/

Signed-off-by: Joyce Kong <joyce.k...@arm.com>
Reviewed-by: Gavin Hu <gavin...@arm.com>
Reviewed-by: Phil Yang <phil.y...@arm.com>
---
 drivers/net/virtio/virtio_rxtx.c                 | 26 ++++++++++++++++++------
 drivers/net/virtio/virtio_user/virtio_user_dev.c |  6 +++++-
 lib/librte_vhost/vhost.h                         |  2 +-
 lib/librte_vhost/virtio_net.c                    | 11 +++++-----
 4 files changed, 31 insertions(+), 14 deletions(-)

diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
index 27ead19..2a2153c 100644
--- a/drivers/net/virtio/virtio_rxtx.c
+++ b/drivers/net/virtio/virtio_rxtx.c
@@ -456,8 +456,14 @@ virtqueue_enqueue_recv_refill_packed(struct virtqueue *vq,
                vq->vq_desc_head_idx = dxp->next;
                if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
                        vq->vq_desc_tail_idx = vq->vq_desc_head_idx;
-               virtio_wmb(hw->weak_barriers);
-               start_dp[idx].flags = flags;
+
+               if (hw->weak_barriers)
+                       __atomic_store_n(&start_dp[idx].flags, flags,
+                                        __ATOMIC_RELEASE);
+               else {
+                       rte_cio_wmb();
+                       start_dp[idx].flags = flags;
+               }
                if (++vq->vq_avail_idx >= vq->vq_nentries) {
                        vq->vq_avail_idx -= vq->vq_nentries;
                        vq->vq_packed.cached_flags ^=
@@ -671,8 +677,12 @@ virtqueue_enqueue_xmit_packed_fast(struct virtnet_tx *txvq,
                        vq->vq_desc_tail_idx = VQ_RING_DESC_CHAIN_END;
        }
 
-       virtio_wmb(vq->hw->weak_barriers);
-       dp->flags = flags;
+       if (vq->hw->weak_barriers)
+               __atomic_store_n(&dp->flags, flags, __ATOMIC_RELEASE);
+       else {
+               rte_cio_wmb();
+               dp->flags = flags;
+       }
 }
 
 static inline void
@@ -763,8 +773,12 @@ virtqueue_enqueue_xmit_packed(struct virtnet_tx *txvq, 
struct rte_mbuf *cookie,
                        vq->vq_desc_tail_idx = VQ_RING_DESC_CHAIN_END;
        }
 
-       virtio_wmb(vq->hw->weak_barriers);
-       head_dp->flags = head_flags;
+       if (vq->hw->weak_barriers)
+               __atomic_store_n(&head_dp->flags, head_flags, __ATOMIC_RELEASE);
+       else {
+               rte_cio_wmb();
+               head_dp->flags = head_flags;
+       }
 }
 
 static inline void
diff --git a/drivers/net/virtio/virtio_user/virtio_user_dev.c 
b/drivers/net/virtio/virtio_user/virtio_user_dev.c
index fab87eb..7911c39 100644
--- a/drivers/net/virtio/virtio_user/virtio_user_dev.c
+++ b/drivers/net/virtio/virtio_user/virtio_user_dev.c
@@ -624,7 +624,7 @@ virtio_user_handle_ctrl_msg(struct virtio_user_dev *dev, 
struct vring *vring,
 static inline int
 desc_is_avail(struct vring_packed_desc *desc, bool wrap_counter)
 {
-       uint16_t flags = desc->flags;
+       uint16_t flags = __atomic_load_n(&desc->flags, __ATOMIC_ACQUIRE);
 
        return wrap_counter == !!(flags & VRING_PACKED_DESC_F_AVAIL) &&
                wrap_counter != !!(flags & VRING_PACKED_DESC_F_USED);
@@ -684,6 +684,10 @@ virtio_user_handle_cq_packed(struct virtio_user_dev *dev, 
uint16_t queue_idx)
        struct vring_packed *vring = &dev->packed_vrings[queue_idx];
        uint16_t n_descs, flags;
 
+       /* Perform a load-acquire barrier in desc_is_avail to
+        * enforce the ordering between desc flags and desc
+        * content.
+        */
        while (desc_is_avail(&vring->desc[vq->used_idx],
                             vq->used_wrap_counter)) {
 
diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index 884befa..d294ed1 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -344,7 +344,7 @@ vq_is_packed(struct virtio_net *dev)
 static inline bool
 desc_is_avail(struct vring_packed_desc *desc, bool wrap_counter)
 {
-       uint16_t flags = *((volatile uint16_t *) &desc->flags);
+       uint16_t flags = __atomic_load_n(&desc->flags, __ATOMIC_ACQUIRE);
 
        return wrap_counter == !!(flags & VRING_DESC_F_AVAIL) &&
                wrap_counter != !!(flags & VRING_DESC_F_USED);
diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
index 5b85b83..e7463ff 100644
--- a/lib/librte_vhost/virtio_net.c
+++ b/lib/librte_vhost/virtio_net.c
@@ -503,14 +503,13 @@ fill_vec_buf_packed(struct virtio_net *dev, struct 
vhost_virtqueue *vq,
        if (avail_idx < vq->last_avail_idx)
                wrap_counter ^= 1;
 
-       if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)))
-               return -1;
-
        /*
-        * The ordering between desc flags and desc
-        * content reads need to be enforced.
+        * Perform a load-acquire barrier in desc_is_avail to
+        * enforce the ordering between desc flags and desc
+        * content.
         */
-       rte_smp_rmb();
+       if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)))
+               return -1;
 
        *desc_count = 0;
        *len = 0;
-- 
2.7.4

Reply via email to