Hi Maxime,

Thanks for your comments. Applies are inline.

> -----Original Message-----
> From: Maxime Coquelin <maxime.coque...@redhat.com>
> Sent: Tuesday, July 13, 2021 10:30 PM
> To: Ma, WenwuX <wenwux...@intel.com>; dev@dpdk.org
> Cc: Xia, Chenbo <chenbo....@intel.com>; Jiang, Cheng1
> <cheng1.ji...@intel.com>; Hu, Jiayu <jiayu...@intel.com>; Wang, YuanX
> <yuanx.w...@intel.com>
> Subject: Re: [PATCH v5 3/4] vhost: support async dequeue for split ring
> >  struct async_inflight_info {
> >     struct rte_mbuf *mbuf;
> > -   uint16_t descs; /* num of descs inflight */
> > +   union {
> > +           uint16_t descs; /* num of descs in-flight */
> > +           struct async_nethdr nethdr;
> > +   };
> >     uint16_t nr_buffers; /* num of buffers inflight for packed ring */
> > -};
> > +} __rte_cache_aligned;
> 
> Does it really need to be cache aligned?

How about changing to 32-byte align? So a cacheline can hold 2 objects.

> 
> >
> >  /**
> >   *  dma channel feature bit definition @@ -193,4 +201,34 @@
> > __rte_experimental  uint16_t rte_vhost_poll_enqueue_completed(int vid,
> > uint16_t queue_id,
> >             struct rte_mbuf **pkts, uint16_t count);
> >
> > +/**
> > + * This function tries to receive packets from the guest with
> > +offloading
> > + * large copies to the DMA engine. Successfully dequeued packets are
> > + * transfer completed, either by the CPU or the DMA engine, and they
> > +are
> > + * returned in "pkts". There may be other packets that are sent from
> > + * the guest but being transferred by the DMA engine, called
> > +in-flight
> > + * packets. The amount of in-flight packets by now is returned in
> > + * "nr_inflight". This function will return in-flight packets only
> > +after
> > + * the DMA engine finishes transferring.
> 
> I am not sure to understand that comment. Is it still "in-flight" if the DMA
> transfer is completed?

"in-flight" means packet copies are submitted to the DMA, but the DMA hasn't
completed copies.

> 
> Are we ensuring packets are not reordered with this way of working?

There is a threshold can be set by users. If set it to 0, which presents all
packet copies assigned to the DMA, the packets sent from the guest will
not be reordered.

> 
> > + *
> > + * @param vid
> > + *  id of vhost device to dequeue data
> > + * @param queue_id
> > + *  queue id to dequeue data
> > + * @param pkts
> > + *  blank array to keep successfully dequeued packets
> > + * @param count
> > + *  size of the packet array
> > + * @param nr_inflight
> > + *  the amount of in-flight packets by now. If error occurred, its
> > + *  value is set to -1.
> > + * @return
> > + *  num of successfully dequeued packets  */ __rte_experimental
> > +uint16_t rte_vhost_async_try_dequeue_burst(int vid, uint16_t
> > +queue_id,
> > +   struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t
> count,
> > +   int *nr_inflight);
> > +
> >  #endif /* _RTE_VHOST_ASYNC_H_ */
> > diff --git a/lib/vhost/version.map b/lib/vhost/version.map index
> > 9103a23cd4..a320f889cd 100644
> > --- a/lib/vhost/version.map
> > +++ b/lib/vhost/version.map
> > @@ -79,4 +79,7 @@ EXPERIMENTAL {
> >
> >     # added in 21.05
> >     rte_vhost_get_negotiated_protocol_features;
> > +
> > +   # added in 21.08
> > +   rte_vhost_async_try_dequeue_burst;
> >  };
> > diff --git a/lib/vhost/virtio_net.c b/lib/vhost/virtio_net.c index
> > b93482587c..52237e8600 100644
> > --- a/lib/vhost/virtio_net.c
> > +++ b/lib/vhost/virtio_net.c
> > @@ -2673,6 +2673,32 @@ virtio_dev_pktmbuf_prep(struct virtio_net *dev,
> struct rte_mbuf *pkt,
> >     return -1;
> >  }
> >
> > +/*
> > + * Allocate a host supported pktmbuf.
> > + */
> > +static __rte_always_inline struct rte_mbuf *
> > +virtio_dev_pktmbuf_alloc(struct virtio_net *dev, struct rte_mempool *mp,
> > +                    uint32_t data_len)
> > +{
> > +   struct rte_mbuf *pkt = rte_pktmbuf_alloc(mp);
> > +
> > +   if (unlikely(pkt == NULL)) {
> > +           VHOST_LOG_DATA(ERR,
> > +                   "Failed to allocate memory for mbuf.\n");
> > +           return NULL;
> > +   }
> > +
> > +   if (virtio_dev_pktmbuf_prep(dev, pkt, data_len)) {
> > +           /* Data doesn't fit into the buffer and the host supports
> > +            * only linear buffers
> > +            */
> > +           rte_pktmbuf_free(pkt);
> > +           return NULL;
> > +   }
> > +
> > +   return pkt;
> > +}
> > +
> 
> I think you should be able to use rte_pktmbuf_alloc_bulk and
> virtio_dev_pktmbuf_prep instead of re-introducing the function that was
> removed by Balazs. It should help perf a bit.
> 
> >  __rte_always_inline
> >  static uint16_t
> >  virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue
> > *vq, @@ -3147,3 +3173,578 @@ rte_vhost_dequeue_burst(int vid,
> uint16_t
> > queue_id,
> >
> >     return count;
> >  }
> > +
> > +
> > +static __rte_always_inline uint16_t
> > +virtio_dev_tx_async_split(struct virtio_net *dev,
> > +           struct vhost_virtqueue *vq, uint16_t queue_id,
> > +           struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
> > +           uint16_t count, bool legacy_ol_flags) {
> > +   static bool allocerr_warned;
> > +   uint16_t pkt_idx;
> > +   uint16_t free_entries;
> > +   uint16_t slot_idx = 0;
> > +   uint16_t segs_await = 0;
> > +   uint16_t nr_done_pkts = 0, nr_async_pkts = 0, nr_async_cmpl_pkts =
> 0;
> > +   uint16_t nr_async_burst = 0;
> > +   uint16_t pkt_err = 0;
> > +   uint16_t iovec_idx = 0, it_idx = 0;
> > +
> > +   struct rte_vhost_iov_iter *it_pool = vq->it_pool;
> > +   struct iovec *vec_pool = vq->vec_pool;
> > +   struct iovec *src_iovec = vec_pool;
> > +   struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
> > +   struct rte_vhost_async_desc tdes[MAX_PKT_BURST];
> > +   struct async_inflight_info *pkts_info = vq->async_pkts_info;
> > +
> > +   struct async_pkt_index {
> > +           uint16_t last_avail_idx;
> > +   } async_pkts_log[MAX_PKT_BURST];
> > +
> > +   /**
> > +    * The ordering between avail index and
> > +    * desc reads needs to be enforced.
> > +    */
> > +   free_entries = __atomic_load_n(&vq->avail->idx,
> __ATOMIC_ACQUIRE) -
> > +                   vq->last_avail_idx;
> > +   if (free_entries == 0)
> > +           goto out;
> > +
> > +   rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size -
> > +1)]);
> > +
> > +   count = RTE_MIN(count, MAX_PKT_BURST);
> > +   count = RTE_MIN(count, free_entries);
> > +   VHOST_LOG_DATA(DEBUG, "(%d) about to dequeue %u buffers\n",
> > +                   dev->vid, count);
> > +
> > +   for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
> > +           uint16_t head_idx = 0;
> > +           uint16_t nr_vec = 0;
> > +           uint32_t buf_len;
> > +           int err;
> > +           struct buf_vector buf_vec[BUF_VECTOR_MAX];
> > +           struct rte_mbuf *pkt;
> > +
> > +           if (unlikely(fill_vec_buf_split(dev, vq, vq->last_avail_idx,
> > +                                           &nr_vec, buf_vec,
> > +                                           &head_idx, &buf_len,
> > +                                           VHOST_ACCESS_RO) < 0))
> > +                   break;
> > +
> > +           pkt = virtio_dev_pktmbuf_alloc(dev, mbuf_pool, buf_len);
> > +           if (unlikely(pkt == NULL)) {
> > +                   /**
> > +                    * mbuf allocation fails for jumbo packets when
> external
> > +                    * buffer allocation is not allowed and linear buffer
> > +                    * is required. Drop this packet.
> > +                    */
> > +                   if (!allocerr_warned) {
> > +                           VHOST_LOG_DATA(ERR,
> > +                                   "Failed mbuf alloc of size %d from %s
> on %s.\n",
> > +                                   buf_len, mbuf_pool->name, dev-
> >ifname);
> > +                           allocerr_warned = true;
> > +                   }
> > +                   break;
> > +           }
> > +
> > +           slot_idx = (vq->async_pkts_idx + nr_async_pkts) &
> > +                           (vq->size - 1);
> > +           err = async_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkt,
> > +                           mbuf_pool, &src_iovec[iovec_idx],
> > +                           &dst_iovec[iovec_idx], &it_pool[it_idx],
> > +                           &it_pool[it_idx + 1],
> > +                           &pkts_info[slot_idx].nethdr, legacy_ol_flags);
> > +           if (unlikely(err)) {
> > +                   rte_pktmbuf_free(pkt);
> > +                   if (!allocerr_warned) {
> > +                           VHOST_LOG_DATA(ERR,
> > +                                   "Failed to copy desc to mbuf
> on %s.\n",
> > +                                   dev->ifname);
> > +                           allocerr_warned = true;
> > +                   }
> > +                   break;
> > +           }
> > +
> > +           if (it_pool[it_idx].count) {
> > +                   uint16_t to = vq->async_desc_idx_split & (vq->size -
> 1);
> > +
> > +                   async_fill_desc(&tdes[nr_async_burst],
> &it_pool[it_idx],
> > +                           &it_pool[it_idx + 1]);
> > +                   pkts_info[slot_idx].mbuf = pkt;
> > +                   async_pkts_log[nr_async_pkts++].last_avail_idx =
> > +                           vq->last_avail_idx;
> > +                   nr_async_burst++;
> > +                   iovec_idx += it_pool[it_idx].nr_segs;
> > +                   it_idx += 2;
> > +                   segs_await += it_pool[it_idx].nr_segs;
> > +
> > +                   /* keep used desc */
> > +                   vq->async_descs_split[to].id = head_idx;
> > +                   vq->async_descs_split[to].len = 0;
> > +                   vq->async_desc_idx_split++;
> > +           } else {
> > +                   update_shadow_used_ring_split(vq, head_idx, 0);
> > +                   pkts[nr_done_pkts++] = pkt;
> > +           }
> > +
> > +           vq->last_avail_idx++;
> > +
> > +           if (unlikely((nr_async_burst >=
> VHOST_ASYNC_BATCH_THRESHOLD) ||
> > +                                   ((VHOST_MAX_ASYNC_VEC >> 1) -
> > +                                    segs_await < BUF_VECTOR_MAX))) {
> > +                   uint16_t nr_pkts;
> > +
> > +                   nr_pkts = vq->async_ops.transfer_data(dev->vid,
> > +                                   queue_id, tdes, 0, nr_async_burst);
> > +                   src_iovec = vec_pool;
> > +                   dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >>
> 1);
> > +                   it_idx = 0;
> > +                   segs_await = 0;
> > +                   vq->async_pkts_inflight_n += nr_pkts;
> > +
> > +                   if (unlikely(nr_pkts < nr_async_burst)) {
> > +                           pkt_err = nr_async_burst - nr_pkts;
> > +                           nr_async_burst = 0;
> > +                           break;
> > +                   }
> > +                   nr_async_burst = 0;
> > +           }
> > +   }
> > +
> > +   if (nr_async_burst) {
> > +           uint32_t nr_pkts;
> > +
> > +           nr_pkts = vq->async_ops.transfer_data(dev->vid, queue_id,
> > +                           tdes, 0, nr_async_burst);
> > +           vq->async_pkts_inflight_n += nr_pkts;
> > +
> > +           if (unlikely(nr_pkts < nr_async_burst))
> > +                   pkt_err = nr_async_burst - nr_pkts;
> > +   }
> > +
> > +   do_data_copy_dequeue(vq);
> > +
> > +   if (unlikely(pkt_err)) {
> > +           uint16_t nr_err_dma = pkt_err;
> > +           uint16_t nr_err_sw;
> > +
> > +           nr_async_pkts -= nr_err_dma;
> > +
> > +           /**
> > +            * revert shadow used ring and free pktmbufs for
> > +            * CPU-copied pkts after the first DMA-error pkt.
> > +            */
> > +           nr_err_sw = vq->last_avail_idx -
> > +                   async_pkts_log[nr_async_pkts].last_avail_idx -
> > +                   nr_err_dma;
> > +           vq->shadow_used_idx -= nr_err_sw;
> > +           while (nr_err_sw-- > 0)
> > +                   rte_pktmbuf_free(pkts[--nr_done_pkts]);
> > +
> > +           /**
> > +            * recover DMA-copy related structures and free pktmbufs
> > +            * for DMA-error pkts.
> > +            */
> > +           vq->async_desc_idx_split -= nr_err_dma;
> > +           while (nr_err_dma-- > 0) {
> > +                   rte_pktmbuf_free(
> > +                           pkts_info[slot_idx & (vq->size - 1)].mbuf);
> > +                   slot_idx--;
> > +           }
> > +
> > +           /* recover available ring */
> > +           vq->last_avail_idx =
> > +                   async_pkts_log[nr_async_pkts].last_avail_idx;
> > +   }
> > +
> > +   vq->async_pkts_idx += nr_async_pkts;
> > +
> > +   if (likely(vq->shadow_used_idx))
> > +           flush_shadow_used_ring_split(dev, vq);
> > +
> > +out:
> > +   if (nr_done_pkts < count && vq->async_pkts_inflight_n > 0) {
> > +           nr_async_cmpl_pkts =
> async_poll_dequeue_completed_split(dev, vq,
> > +                                   queue_id, &pkts[nr_done_pkts],
> > +                                   count - nr_done_pkts,
> > +                                   legacy_ol_flags);
> > +           nr_done_pkts += nr_async_cmpl_pkts;
> > +   }
> > +   if (likely(nr_done_pkts))
> > +           vhost_vring_call_split(dev, vq);
> > +
> > +   return nr_done_pkts;
> > +}
> > +
> > +__rte_noinline
> > +static uint16_t
> > +virtio_dev_tx_async_split_legacy(struct virtio_net *dev,
> > +           struct vhost_virtqueue *vq, uint16_t queue_id,
> > +           struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
> > +           uint16_t count)
> > +{
> > +   return virtio_dev_tx_async_split(dev, vq, queue_id, mbuf_pool,
> > +                           pkts, count, true);
> 
> I think we don't need to support legacy offload.
> It may be better to have the Vhost example to support the compliant way,
> what do you think?

The legacy offload is disabled by RTE_VHOST_USER_NET_COMPLIANT_OL_FLAGS,
and compliant mode is disabled by default. If we don't implement legacy mode in
the async dequeue code, how to handle the case that users don't set the flag?

Thanks,
Jiayu

Reply via email to