> > On 9/29/20 11:29 AM, Patrick Fu wrote:
> > Current async ops allows check_completed_copies() callback to return
> > arbitrary number of async iov segments finished from backend async
> > devices. This design creates complexity for vhost to handle breaking
> > transfer of a single packet (i.e. transfer completes in the middle
> > of a async descriptor) and prevents application callbacks from
> > leveraging hardware capability to offload the work. Thus, this patch
> > enforces the check_completed_copies() callback to return the number
> > of async memory descriptors, which is aligned with async transfer
> > data ops callbacks. vHost async data path are revised to work with
> > new ops define, which provides a clean and simplified processing.
> >
> > Signed-off-by: Patrick Fu <patrick...@intel.com>
> > ---
> >  lib/librte_vhost/rte_vhost_async.h |  15 ++-
> >  lib/librte_vhost/vhost.c           |  20 ++--
> >  lib/librte_vhost/vhost.h           |   8 +-
> >  lib/librte_vhost/vhost_user.c      |   6 +-
> >  lib/librte_vhost/virtio_net.c      | 151 ++++++++++++-----------------
> >  5 files changed, 90 insertions(+), 110 deletions(-)
> >
> > diff --git a/lib/librte_vhost/rte_vhost_async.h
> b/lib/librte_vhost/rte_vhost_async.h
> > index cb6284539..c73bd7c99 100644
> > --- a/lib/librte_vhost/rte_vhost_async.h
> > +++ b/lib/librte_vhost/rte_vhost_async.h
> > @@ -76,13 +76,26 @@ struct rte_vhost_async_channel_ops {
> >      * @param max_packets
> >      *  max number of packets could be completed
> >      * @return
> > -    *  number of iov segments completed
> > +    *  number of async descs completed
> >      */
> >     uint32_t (*check_completed_copies)(int vid, uint16_t queue_id,
> >             struct rte_vhost_async_status *opaque_data,
> >             uint16_t max_packets);
> >  };
> >
> > +/**
> > + * inflight async packet information
> > + */
> > +struct async_inflight_info {
> > +   union {
> > +           uint32_t info;
> > +           struct {
> > +                   uint16_t descs; /* num of descs inflight */
> > +                   uint16_t segs; /* iov segs inflight */
> > +           };
> > +   };
> 
> I think you can removing the union.
> 
There are some cases in the code that we want to set descs=N and set segs=0. In 
such cases, reference info=N directly will be faster than setting descs & segs 
separately.  So I would prefer to keep the union.

> > +};
> > +
> >  /**
> >   *  dma channel feature bit definition
> >   */
> > diff --git a/lib/librte_vhost/vhost.c b/lib/librte_vhost/vhost.c
> > index 8f20a0818..eca507836 100644
> > --- a/lib/librte_vhost/vhost.c
> > +++ b/lib/librte_vhost/vhost.c
> > @@ -333,8 +333,8 @@ free_vq(struct virtio_net *dev, struct
> vhost_virtqueue *vq)
> >             rte_free(vq->shadow_used_split);
> >             if (vq->async_pkts_pending)
> >                     rte_free(vq->async_pkts_pending);
> > -           if (vq->async_pending_info)
> > -                   rte_free(vq->async_pending_info);
> > +           if (vq->async_pkts_info)
> > +                   rte_free(vq->async_pkts_info);
> >     }
> >     rte_free(vq->batch_copy_elems);
> >     rte_mempool_free(vq->iotlb_pool);
> > @@ -1573,15 +1573,15 @@ int rte_vhost_async_channel_register(int vid,
> uint16_t queue_id,
> >     vq->async_pkts_pending = rte_malloc(NULL,
> >                     vq->size * sizeof(uintptr_t),
> >                     RTE_CACHE_LINE_SIZE);
> > -   vq->async_pending_info = rte_malloc(NULL,
> > -                   vq->size * sizeof(uint64_t),
> > +   vq->async_pkts_info = rte_malloc(NULL,
> > +                   vq->size * sizeof(struct async_inflight_info),
> >                     RTE_CACHE_LINE_SIZE);
> > -   if (!vq->async_pkts_pending || !vq->async_pending_info) {
> > +   if (!vq->async_pkts_pending || !vq->async_pkts_info) {
> >             if (vq->async_pkts_pending)
> >                     rte_free(vq->async_pkts_pending);
> >
> > -           if (vq->async_pending_info)
> > -                   rte_free(vq->async_pending_info);
> > +           if (vq->async_pkts_info)
> > +                   rte_free(vq->async_pkts_info);
> >
> >             VHOST_LOG_CONFIG(ERR,
> >                             "async register failed: cannot allocate
> memory for vq data "
> > @@ -1635,9 +1635,9 @@ int rte_vhost_async_channel_unregister(int vid,
> uint16_t queue_id)
> >             vq->async_pkts_pending = NULL;
> >     }
> >
> > -   if (vq->async_pending_info) {
> > -           rte_free(vq->async_pending_info);
> > -           vq->async_pending_info = NULL;
> > +   if (vq->async_pkts_info) {
> > +           rte_free(vq->async_pkts_info);
> > +           vq->async_pkts_info = NULL;
> >     }
> >
> >     vq->async_ops.transfer_data = NULL;
> > diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
> > index 73a1ed889..155a832c1 100644
> > --- a/lib/librte_vhost/vhost.h
> > +++ b/lib/librte_vhost/vhost.h
> > @@ -46,8 +46,6 @@
> >
> >  #define MAX_PKT_BURST 32
> >
> > -#define ASYNC_MAX_POLL_SEG 255
> > -
> >  #define VHOST_MAX_ASYNC_IT (MAX_PKT_BURST * 2)
> >  #define VHOST_MAX_ASYNC_VEC (BUF_VECTOR_MAX * 2)
> >
> > @@ -225,12 +223,10 @@ struct vhost_virtqueue {
> >
> >     /* async data transfer status */
> >     uintptr_t       **async_pkts_pending;
> > -   #define         ASYNC_PENDING_INFO_N_MSK 0xFFFF
> > -   #define         ASYNC_PENDING_INFO_N_SFT 16
> > -   uint64_t        *async_pending_info;
> > +   struct async_inflight_info *async_pkts_info;
> >     uint16_t        async_pkts_idx;
> >     uint16_t        async_pkts_inflight_n;
> > -   uint16_t        async_last_seg_n;
> > +   uint16_t        async_last_pkts_n;
> >
> >     /* vq async features */
> >     bool            async_inorder;
> > diff --git a/lib/librte_vhost/vhost_user.c b/lib/librte_vhost/vhost_user.c
> > index 501218e19..67d2a2d43 100644
> > --- a/lib/librte_vhost/vhost_user.c
> > +++ b/lib/librte_vhost/vhost_user.c
> > @@ -2006,10 +2006,10 @@ vhost_user_get_vring_base(struct virtio_net
> **pdev,
> >             vq->shadow_used_split = NULL;
> >             if (vq->async_pkts_pending)
> >                     rte_free(vq->async_pkts_pending);
> > -           if (vq->async_pending_info)
> > -                   rte_free(vq->async_pending_info);
> > +           if (vq->async_pkts_info)
> > +                   rte_free(vq->async_pkts_info);
> >             vq->async_pkts_pending = NULL;
> > -           vq->async_pending_info = NULL;
> > +           vq->async_pkts_info = NULL;
> >     }
> >
> >     rte_free(vq->batch_copy_elems);
> > diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c
> > index bd9303c8a..68ead9a71 100644
> > --- a/lib/librte_vhost/virtio_net.c
> > +++ b/lib/librte_vhost/virtio_net.c
> > @@ -1473,37 +1473,6 @@ virtio_dev_rx_async_get_info_idx(uint16_t
> pkts_idx,
> >             (vq_size - n_inflight + pkts_idx) & (vq_size - 1);
> >  }
> >
> > -static __rte_always_inline void
> > -virtio_dev_rx_async_submit_split_err(struct virtio_net *dev,
> > -   struct vhost_virtqueue *vq, uint16_t queue_id,
> > -   uint16_t last_idx, uint16_t shadow_idx)
> > -{
> > -   uint16_t start_idx, pkts_idx, vq_size;
> > -   uint64_t *async_pending_info;
> > -
> > -   pkts_idx = vq->async_pkts_idx;
> > -   async_pending_info = vq->async_pending_info;
> > -   vq_size = vq->size;
> > -   start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
> > -           vq_size, vq->async_pkts_inflight_n);
> > -
> > -   while (likely((start_idx & (vq_size - 1)) != pkts_idx)) {
> > -           uint64_t n_seg =
> > -                   async_pending_info[(start_idx) & (vq_size - 1)] >>
> > -                   ASYNC_PENDING_INFO_N_SFT;
> > -
> > -           while (n_seg)
> > -                   n_seg -= vq-
> >async_ops.check_completed_copies(dev->vid,
> > -                           queue_id, 0, 1);
> > -   }
> > -
> > -   vq->async_pkts_inflight_n = 0;
> > -   vq->batch_copy_nb_elems = 0;
> > -
> > -   vq->shadow_used_idx = shadow_idx;
> > -   vq->last_avail_idx = last_idx;
> > -}
> > -
> >  static __rte_noinline uint32_t
> >  virtio_dev_rx_async_submit_split(struct virtio_net *dev,
> >     struct vhost_virtqueue *vq, uint16_t queue_id,
> > @@ -1512,7 +1481,7 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> >     uint32_t pkt_idx = 0, pkt_burst_idx = 0;
> >     uint16_t num_buffers;
> >     struct buf_vector buf_vec[BUF_VECTOR_MAX];
> > -   uint16_t avail_head, last_idx, shadow_idx;
> > +   uint16_t avail_head;
> >
> >     struct rte_vhost_iov_iter *it_pool = vq->it_pool;
> >     struct iovec *vec_pool = vq->vec_pool;
> > @@ -1522,11 +1491,11 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> >     struct rte_vhost_iov_iter *src_it = it_pool;
> >     struct rte_vhost_iov_iter *dst_it = it_pool + 1;
> >     uint16_t n_free_slot, slot_idx;
> > +   uint16_t pkt_err = 0;
> > +   struct async_inflight_info *pkts_info = vq->async_pkts_info;
> >     int n_pkts = 0;
> >
> >     avail_head = __atomic_load_n(&vq->avail->idx,
> __ATOMIC_ACQUIRE);
> > -   last_idx = vq->last_avail_idx;
> > -   shadow_idx = vq->shadow_used_idx;
> >
> >     /*
> >      * The ordering between avail index and
> > @@ -1565,14 +1534,14 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> >             if (src_it->count) {
> >                     async_fill_desc(&tdes[pkt_burst_idx], src_it, dst_it);
> >                     pkt_burst_idx++;
> > -                   vq->async_pending_info[slot_idx] =
> > -                           num_buffers | (src_it->nr_segs << 16);
> > +                   pkts_info[slot_idx].descs = num_buffers;
> > +                   pkts_info[slot_idx].segs = src_it->nr_segs;
> >                     src_iovec += src_it->nr_segs;
> >                     dst_iovec += dst_it->nr_segs;
> >                     src_it += 2;
> >                     dst_it += 2;
> >             } else {
> > -                   vq->async_pending_info[slot_idx] = num_buffers;
> > +                   pkts_info[slot_idx].info = num_buffers;
> >                     vq->async_pkts_inflight_n++;
> >             }
> >
> > @@ -1586,35 +1555,46 @@ virtio_dev_rx_async_submit_split(struct
> virtio_net *dev,
> >                     dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >>
> 1);
> >                     src_it = it_pool;
> >                     dst_it = it_pool + 1;
> > +                   vq->async_pkts_inflight_n += n_pkts;
> >
> >                     if (unlikely(n_pkts < (int)pkt_burst_idx)) {
> > -                           vq->async_pkts_inflight_n +=
> > -                                   n_pkts > 0 ? n_pkts : 0;
> > -                           virtio_dev_rx_async_submit_split_err(dev,
> > -                                   vq, queue_id, last_idx, shadow_idx);
> > -                           return 0;
> > +                           /*
> > +                            * log error packets number here and do
> actual
> > +                            * error processing when applications poll
> > +                            * completion
> > +                            */
> > +                           pkt_err = pkt_burst_idx - n_pkts;
> > +                           pkt_burst_idx = 0;
> > +                           break;
> >                     }
> >
> >                     pkt_burst_idx = 0;
> > -                   vq->async_pkts_inflight_n += n_pkts;
> >             }
> >     }
> >
> >     if (pkt_burst_idx) {
> >             n_pkts = vq->async_ops.transfer_data(dev->vid,
> >                             queue_id, tdes, 0, pkt_burst_idx);
> > -           if (unlikely(n_pkts < (int)pkt_burst_idx)) {
> > -                   vq->async_pkts_inflight_n += n_pkts > 0 ? n_pkts : 0;
> > -                   virtio_dev_rx_async_submit_split_err(dev, vq,
> queue_id,
> > -                           last_idx, shadow_idx);
> > -                   return 0;
> > -           }
> > -
> >             vq->async_pkts_inflight_n += n_pkts;
> > +
> > +           if (unlikely(n_pkts < (int)pkt_burst_idx))
> > +                   pkt_err = pkt_burst_idx - n_pkts;
> >     }
> >
> >     do_data_copy_enqueue(dev, vq);
> >
> > +   if (unlikely(pkt_err)) {
> > +           do {
> > +                   if (pkts_info[slot_idx].segs)
> > +                           pkt_err--;
> > +                   vq->last_avail_idx -= pkts_info[slot_idx].descs;
> > +                   vq->shadow_used_idx -= pkts_info[slot_idx].descs;
> > +                   vq->async_pkts_inflight_n--;
> > +                   slot_idx--;
> > +                   pkt_idx--;
> > +           } while (pkt_err);
> 
> That does not sound really robust.
> Maybe you could exit also on slot_idx < 0 to avoid out of range
> accesses?
> 
> } while (pkt_err && slot_idx >= 0)
>
slot_idx is unsigned type and it's design to be working like a ring index. But 
I do need to send a new patch to handle the case that "slot_idx==0".
For the robust, I can add a check: "pkt_idx > 0".

Thanks,

Patrick

Reply via email to