This patch implements asynchronous dequeue data path for packed ring.

Signed-off-by: Cheng Jiang <cheng1.ji...@intel.com>
---
It's based on these 2 patches:
1. vhost: remove copy threshold for async vhost
http://patches.dpdk.org/project/dpdk/patch/1629463466-450012-1-git-send-email-jiayu...@intel.com/
2. vhost: support async dequeue for split ring
http://patches.dpdk.org/project/dpdk/patch/20210906204837.112466-2-wenwux...@intel.com/

 lib/vhost/virtio_net.c | 321 ++++++++++++++++++++++++++++++++++++++---
 1 file changed, 298 insertions(+), 23 deletions(-)

diff --git a/lib/vhost/virtio_net.c b/lib/vhost/virtio_net.c
index e0159b53e3..e2867a3e0e 100644
--- a/lib/vhost/virtio_net.c
+++ b/lib/vhost/virtio_net.c
@@ -1654,7 +1654,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 }

 static __rte_always_inline void
-vhost_update_used_packed(struct vhost_virtqueue *vq,
+vhost_enqueue_update_used_packed(struct vhost_virtqueue *vq,
                        struct vring_used_elem_packed *shadow_ring,
                        uint16_t count)
 {
@@ -1970,22 +1970,66 @@ write_back_completed_descs_split(struct vhost_virtqueue 
*vq, uint16_t n_descs)
        } while (nr_left > 0);
 }

+static __rte_always_inline void
+vhost_dequeue_update_used_packed(struct vhost_virtqueue *vq,
+                       struct vring_used_elem_packed *shadow_ring,
+                       uint16_t count)
+{
+       uint16_t i;
+       uint16_t flags;
+       uint16_t head_idx = vq->last_used_idx;
+       uint16_t head_flags = 0;
+
+       for (i = 0; i < count; i++)
+               vq->desc_packed[vq->last_used_idx + i].id = shadow_ring[i].id;
+
+       /* The ordering for storing desc flags needs to be enforced. */
+       rte_atomic_thread_fence(__ATOMIC_RELEASE);
+
+       for (i = 0; i < count; i++) {
+               flags = vq->desc_packed[vq->last_used_idx].flags;
+               if (vq->used_wrap_counter) {
+                       flags |= VRING_DESC_F_USED;
+                       flags |= VRING_DESC_F_AVAIL;
+               } else {
+                       flags &= ~VRING_DESC_F_USED;
+                       flags &= ~VRING_DESC_F_AVAIL;
+               }
+
+               if (i > 0)
+                       vq->desc_packed[vq->last_used_idx].flags = flags;
+               else
+                       head_flags = flags;
+
+               vq_inc_last_used_packed(vq, 1);
+       }
+
+       vq->desc_packed[head_idx].flags = head_flags;
+}
+
 static __rte_always_inline void
 write_back_completed_descs_packed(struct vhost_virtqueue *vq,
-                               uint16_t n_buffers)
+                               uint16_t n_buffers, bool is_txq)
 {
        uint16_t nr_left = n_buffers;
        uint16_t from, to;
+       void (*update_used_packed) (struct vhost_virtqueue *vq,
+                               struct vring_used_elem_packed *shadow_ring, 
uint16_t count);
+
+       if (is_txq)
+               update_used_packed = vhost_enqueue_update_used_packed;
+       else
+               update_used_packed = vhost_dequeue_update_used_packed;

        do {
                from = vq->last_async_buffer_idx_packed;
                to = (from + nr_left) % vq->size;
                if (to > from) {
-                       vhost_update_used_packed(vq, vq->async_buffers_packed + 
from, to - from);
+                       update_used_packed(vq, vq->async_buffers_packed + from, 
to - from);
                        vq->last_async_buffer_idx_packed += nr_left;
                        nr_left = 0;
                } else {
-                       vhost_update_used_packed(vq, vq->async_buffers_packed + 
from,
+                       update_used_packed(vq, vq->async_buffers_packed + from,
                                vq->size - from);
                        vq->last_async_buffer_idx_packed = 0;
                        nr_left -= vq->size - from;
@@ -2049,7 +2093,7 @@ vhost_poll_enqueue_completed(struct virtio_net *dev, 
uint16_t queue_id,

        if (likely(vq->enabled && vq->access_ok)) {
                if (vq_is_packed(dev)) {
-                       write_back_completed_descs_packed(vq, n_buffers);
+                       write_back_completed_descs_packed(vq, n_buffers, 1);

                        vhost_vring_call_packed(dev, vq);
                } else {
@@ -3328,7 +3372,7 @@ async_desc_to_mbuf(struct virtio_net *dev,
 }

 static __rte_always_inline uint16_t
-async_poll_dequeue_completed_split(struct virtio_net *dev,
+async_poll_dequeue_completed(struct virtio_net *dev,
                struct vhost_virtqueue *vq, uint16_t queue_id,
                struct rte_mbuf **pkts, uint16_t count, bool legacy_ol_flags)
 {
@@ -3336,7 +3380,7 @@ async_poll_dequeue_completed_split(struct virtio_net *dev,
        uint16_t start_idx, pkt_idx, from;
        struct async_inflight_info *pkts_info;

-       pkt_idx = vq->async_pkts_idx & (vq->size - 1);
+       pkt_idx = vq->async_pkts_idx % vq->size;
        pkts_info = vq->async_pkts_info;
        start_idx = virtio_dev_rx_async_get_info_idx(pkt_idx, vq->size,
                        vq->async_pkts_inflight_n);
@@ -3360,7 +3404,7 @@ async_poll_dequeue_completed_split(struct virtio_net *dev,
        n_pkts_put = RTE_MIN(count, n_pkts_cpl);

        for (pkt_idx = 0; pkt_idx < n_pkts_put; pkt_idx++) {
-               from = (start_idx + pkt_idx) & (vq->size - 1);
+               from = (start_idx + pkt_idx) % vq->size;
                pkts[pkt_idx] = pkts_info[from].mbuf;

                if (pkts_info[from].nethdr.valid) {
@@ -3370,9 +3414,14 @@ async_poll_dequeue_completed_split(struct virtio_net 
*dev,
        }

        /* write back completed descs to used ring and update used idx */
-       write_back_completed_descs_split(vq, n_pkts_put);
-       __atomic_add_fetch(&vq->used->idx, n_pkts_put, __ATOMIC_RELEASE);
-       vhost_vring_call_split(dev, vq);
+       if (vq_is_packed(dev)) {
+               write_back_completed_descs_packed(vq, n_pkts_put, 0);
+               vhost_vring_call_packed(dev, vq);
+       } else {
+               write_back_completed_descs_split(vq, n_pkts_put);
+               __atomic_add_fetch(&vq->used->idx, n_pkts_put, 
__ATOMIC_RELEASE);
+               vhost_vring_call_split(dev, vq);
+       }

        vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
        vq->async_pkts_inflight_n -= n_pkts_put;
@@ -3554,10 +3603,9 @@ virtio_dev_tx_async_split(struct virtio_net *dev,
        vq->async_pkts_idx += pkt_idx;

 out:
-       if (vq->async_pkts_inflight_n > 0) {
-               nr_done_pkts = async_poll_dequeue_completed_split(dev, vq,
-                                       queue_id, pkts, count, legacy_ol_flags);
-       }
+       if (vq->async_pkts_inflight_n > 0)
+               nr_done_pkts = async_poll_dequeue_completed(dev, vq, queue_id, 
pkts,
+                                       count, legacy_ol_flags);

        return nr_done_pkts;
 }
@@ -3584,6 +3632,226 @@ virtio_dev_tx_async_split_compliant(struct virtio_net 
*dev,
                                pkts, count, false);
 }

+static __rte_always_inline void
+vhost_async_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
+                                  uint16_t buf_id)
+{
+       uint16_t idx = vq->async_buffer_idx_packed;
+
+       vq->async_buffers_packed[idx].id  = buf_id;
+       vq->async_buffers_packed[idx].len = 0;
+
+       vq->async_buffer_idx_packed++;
+       if (vq->async_buffer_idx_packed >= vq->size)
+               vq->async_buffer_idx_packed -= vq->size;
+
+}
+
+static __rte_always_inline int
+virtio_dev_tx_async_single_packed(struct virtio_net *dev,
+                       struct vhost_virtqueue *vq,
+                       struct rte_mempool *mbuf_pool,
+                       struct rte_mbuf *pkts,
+                       struct iovec *src_iovec, struct iovec *dst_iovec,
+                       struct rte_vhost_iov_iter *src_it,
+                       struct rte_vhost_iov_iter *dst_it,
+                       struct async_nethdr *nethdr,
+                       int nr_iovec)
+{
+       int err;
+       uint16_t buf_id, desc_count = 0;
+       uint16_t nr_vec = 0;
+       uint32_t buf_len;
+       struct buf_vector buf_vec[BUF_VECTOR_MAX];
+       static bool allocerr_warned;
+
+       if (unlikely(fill_vec_buf_packed(dev, vq,
+                                        vq->last_avail_idx, &desc_count,
+                                        buf_vec, &nr_vec,
+                                        &buf_id, &buf_len,
+                                        VHOST_ACCESS_RO) < 0))
+               return -1;
+
+       if (unlikely(virtio_dev_pktmbuf_prep(dev, pkts, buf_len))) {
+               if (!allocerr_warned) {
+                       VHOST_LOG_DATA(ERR,
+                               "Failed mbuf alloc of size %d from %s on %s.\n",
+                               buf_len, mbuf_pool->name, dev->ifname);
+                       allocerr_warned = true;
+               }
+               return -1;
+       }
+
+       err = async_desc_to_mbuf(dev, buf_vec, nr_vec, pkts, mbuf_pool,
+                               src_iovec, dst_iovec, src_it, dst_it, nethdr, 
nr_iovec);
+       if (unlikely(err)) {
+               rte_pktmbuf_free(pkts);
+               if (!allocerr_warned) {
+                       VHOST_LOG_DATA(ERR,
+                               "Failed to copy desc to mbuf on %s.\n",
+                               dev->ifname);
+                       allocerr_warned = true;
+               }
+               return -1;
+       }
+
+       /* update async shadow packed ring */
+       vhost_async_shadow_dequeue_single_packed(vq, buf_id);
+
+       return err;
+}
+
+static __rte_always_inline uint16_t
+virtio_dev_tx_async_packed(struct virtio_net *dev,
+               struct vhost_virtqueue *vq, uint16_t queue_id,
+               struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+               uint16_t count, bool legacy_ol_flags)
+{
+       uint16_t pkt_idx;
+       uint16_t nr_async_burst = 0;
+       uint16_t slot_idx = 0;
+       uint16_t nr_done_pkts = 0;
+       uint16_t pkt_err = 0;
+       uint16_t iovec_idx = 0, it_idx = 0;
+       struct rte_vhost_iov_iter *it_pool = vq->it_pool;
+       struct iovec *vec_pool = vq->vec_pool;
+       struct iovec *src_iovec = vec_pool;
+       struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
+       struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
+       struct async_inflight_info *pkts_info = vq->async_pkts_info;
+       struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST];
+
+       VHOST_LOG_DATA(DEBUG, "(%d) about to dequeue %u buffers\n",
+                       dev->vid, count);
+
+       if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count))
+               goto out;
+
+       for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
+               struct rte_mbuf *pkt = pkts_prealloc[pkt_idx];
+
+               rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
+
+               slot_idx = (vq->async_pkts_idx + pkt_idx) % vq->size;
+               if (unlikely(virtio_dev_tx_async_single_packed(dev, vq, 
mbuf_pool, pkt,
+                               &src_iovec[iovec_idx], &dst_iovec[iovec_idx], 
&it_pool[it_idx],
+                               &it_pool[it_idx + 1], 
&pkts_info[slot_idx].nethdr,
+                               (VHOST_MAX_ASYNC_VEC >> 1) - iovec_idx))) {
+                       rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count - 
pkt_idx);
+                       break;
+               }
+
+               async_fill_desc(&tdes[nr_async_burst], &it_pool[it_idx], 
&it_pool[it_idx + 1]);
+               pkts_info[slot_idx].mbuf = pkt;
+               nr_async_burst++;
+
+               iovec_idx += it_pool[it_idx].nr_segs;
+               it_idx += 2;
+
+               vq_inc_last_avail_packed(vq, 1);
+
+               if (unlikely((nr_async_burst >= VHOST_ASYNC_BATCH_THRESHOLD) ||
+                               ((VHOST_MAX_ASYNC_VEC >> 1) - iovec_idx < 
BUF_VECTOR_MAX))) {
+                       uint16_t nr_pkts;
+                       int32_t ret;
+
+                       ret = vq->async_ops.transfer_data(dev->vid, queue_id,
+                                       tdes, 0, nr_async_burst);
+                       if (unlikely(ret < 0)) {
+                               VHOST_LOG_DATA(ERR, "(%d) async channel submit 
error\n", dev->vid);
+                               ret = 0;
+                       }
+                       nr_pkts = ret;
+
+                       vq->async_pkts_inflight_n += nr_pkts;
+                       it_idx = 0;
+                       iovec_idx = 0;
+
+                       if (unlikely(nr_pkts < nr_async_burst)) {
+                               pkt_err = nr_async_burst - nr_pkts;
+                               nr_async_burst = 0;
+                               pkt_idx++;
+                               break;
+                       }
+                       nr_async_burst = 0;
+               }
+       }
+
+       if (nr_async_burst) {
+               uint16_t nr_pkts;
+               int32_t ret;
+
+               ret = vq->async_ops.transfer_data(dev->vid, queue_id, tdes, 0, 
nr_async_burst);
+               if (unlikely(ret < 0)) {
+                       VHOST_LOG_DATA(ERR, "(%d) async channel submit 
error\n", dev->vid);
+                       ret = 0;
+               }
+               nr_pkts = ret;
+
+               vq->async_pkts_inflight_n += nr_pkts;
+
+               if (unlikely(nr_pkts < nr_async_burst))
+                       pkt_err = nr_async_burst - nr_pkts;
+       }
+
+       if (unlikely(pkt_err)) {
+               uint16_t nr_err_dma = pkt_err;
+
+               pkt_idx -= nr_err_dma;
+
+               /**
+                * recover DMA-copy related structures and free pktmbufs
+                * for DMA-error pkts.
+                */
+               vq->async_buffer_idx_packed -= nr_err_dma;
+               while (nr_err_dma-- > 0) {
+                       rte_pktmbuf_free(pkts_info[slot_idx % vq->size].mbuf);
+                       slot_idx--;
+               }
+
+               /* recover available ring */
+               if (vq->last_avail_idx >= pkt_err) {
+                       vq->last_avail_idx -= pkt_err;
+               } else {
+                       vq->last_avail_idx = vq->size + vq->last_avail_idx - 
pkt_err;
+                       vq->avail_wrap_counter ^= 1;
+               }
+       }
+
+       vq->async_pkts_idx += pkt_idx;
+       if (vq->async_pkts_idx >= vq->size)
+               vq->async_pkts_idx -= vq->size;
+
+out:
+       if (vq->async_pkts_inflight_n > 0)
+               nr_done_pkts = async_poll_dequeue_completed(dev, vq,
+                                       queue_id, pkts, count, legacy_ol_flags);
+
+       return nr_done_pkts;
+}
+
+__rte_noinline
+static uint16_t
+virtio_dev_tx_async_packed_legacy(struct virtio_net *dev,
+               struct vhost_virtqueue *vq, uint16_t queue_id,
+               struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+               uint16_t count)
+{
+       return virtio_dev_tx_async_packed(dev, vq, queue_id, mbuf_pool,
+                               pkts, count, true);
+}
+
+__rte_noinline
+static uint16_t
+virtio_dev_tx_async_packed_compliant(struct virtio_net *dev,
+               struct vhost_virtqueue *vq, uint16_t queue_id,
+               struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+               uint16_t count)
+{
+       return virtio_dev_tx_async_packed(dev, vq, queue_id, mbuf_pool,
+                               pkts, count, false);
+}
+
 uint16_t
 rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
        struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
@@ -3669,15 +3937,22 @@ rte_vhost_async_try_dequeue_burst(int vid, uint16_t 
queue_id,
                count -= 1;
        }

-       if (unlikely(vq_is_packed(dev)))
-               return 0;
+       if (vq_is_packed(dev)) {
+               if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
+                       count = virtio_dev_tx_async_packed_legacy(dev, vq, 
queue_id,
+                                       mbuf_pool, pkts, count);
+               else
+                       count = virtio_dev_tx_async_packed_compliant(dev, vq, 
queue_id,
+                                       mbuf_pool, pkts, count);
+       } else {
+               if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
+                       count = virtio_dev_tx_async_split_legacy(dev, vq, 
queue_id,
+                                       mbuf_pool, pkts, count);
+               else
+                       count = virtio_dev_tx_async_split_compliant(dev, vq, 
queue_id,
+                                       mbuf_pool, pkts, count);
+       }

-       if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
-               count = virtio_dev_tx_async_split_legacy(dev, vq, queue_id,
-                               mbuf_pool, pkts, count);
-       else
-               count = virtio_dev_tx_async_split_compliant(dev, vq, queue_id,
-                               mbuf_pool, pkts, count);

 out:
        *nr_inflight = vq->async_pkts_inflight_n;
--
2.32.0

Reply via email to