Hi Maxime,

Thanks for your comments. Please see replies inline.

> -----Original Message-----
> From: Maxime Coquelin <maxime.coque...@redhat.com>
> Sent: Monday, October 24, 2022 5:08 PM
> To: Wang, YuanX <yuanx.w...@intel.com>; Xia, Chenbo
> <chenbo....@intel.com>
> Cc: dev@dpdk.org; Hu, Jiayu <jiayu...@intel.com>; Jiang, Cheng1
> <cheng1.ji...@intel.com>; Ma, WenwuX <wenwux...@intel.com>; He,
> Xingguang <xingguang...@intel.com>
> Subject: Re: [PATCH v5] net/vhost: support asynchronous data path
> 
> Hi Yuan,
> 
> Please fix your system for the next times, your patch is dated several hours 
> in
> the future.
> 
> On 10/24/22 17:14, Yuan Wang wrote:
> > Vhost asynchronous data-path offloads packet copy from the CPU to the
> > DMA engine. As a result, large packet copy can be accelerated by the
> > DMA engine, and vhost can free CPU cycles for higher level
> 
> Vhost
> 
> > functions.
> >
> > In this patch, we enable asynchronous data-path for vhostpmd.
> > Asynchronous data path is enabled per tx/rx queue, and users need to
> > specify the DMA device used by the tx/rx queue. Each tx/rx queue only
> > supports to use one DMA device, but one DMA device can be shared
> among
> > multiple tx/rx queues of different vhost PMD ports.
> >
> > Two PMD parameters are added:
> > - dmas:     specify the used DMA device for a tx/rx queue.
> >     (Default: no queues enable asynchronous data path)
> > - dma-ring-size: DMA ring size.
> >     (Default: 4096).
> >
> > Here is an example:
> > --vdev
> 'eth_vhost0,iface=./s0,dmas=[txq0@0000:00.01.0;rxq0@0000:00.01.1],dma-
> ring-size=4096'
> >
> > Signed-off-by: Jiayu Hu <jiayu...@intel.com>
> > Signed-off-by: Yuan Wang <yuanx.w...@intel.com>
> > Signed-off-by: Wenwu Ma <wenwux...@intel.com>
> >
> > ---
> > v5:
> > - add error log of testpmd cmd
> >
> > v4:
> > - add txq allow_queuing check on poll completed
> > - unregister dma channel in destroy_device
> > - add dma ring size minimum limit
> > - fix serveral code style and logging issues
> >
> > v3:
> > - add the API to version.map
> >
> > v2:
> > - add missing file
> > - hide async_tx_poll_completed
> > - change default DMA ring size to 4096
> >
> > ---
> >   drivers/net/vhost/meson.build     |   1 +
> >   drivers/net/vhost/rte_eth_vhost.c | 512
> ++++++++++++++++++++++++++++--
> >   drivers/net/vhost/rte_eth_vhost.h |  15 +
> >   drivers/net/vhost/version.map     |   7 +
> >   drivers/net/vhost/vhost_testpmd.c |  67 ++++
> >   5 files changed, 569 insertions(+), 33 deletions(-)
> >   create mode 100644 drivers/net/vhost/vhost_testpmd.c
> >
> > diff --git a/drivers/net/vhost/meson.build
> > b/drivers/net/vhost/meson.build index f481a3a4b8..22a0ab3a58 100644
> > --- a/drivers/net/vhost/meson.build
> > +++ b/drivers/net/vhost/meson.build
> > @@ -9,4 +9,5 @@ endif
> >
> >   deps += 'vhost'
> >   sources = files('rte_eth_vhost.c')
> > +testpmd_sources = files('vhost_testpmd.c')
> >   headers = files('rte_eth_vhost.h')
> > diff --git a/drivers/net/vhost/rte_eth_vhost.c
> > b/drivers/net/vhost/rte_eth_vhost.c
> > index b152279fac..80cd9c8d92 100644
> > --- a/drivers/net/vhost/rte_eth_vhost.c
> > +++ b/drivers/net/vhost/rte_eth_vhost.c
> > @@ -7,6 +7,7 @@
> >   #include <pthread.h>
> >   #include <stdbool.h>
> >   #include <sys/epoll.h>
> > +#include <ctype.h>
> >
> >   #include <rte_mbuf.h>
> >   #include <ethdev_driver.h>
> > @@ -18,6 +19,8 @@
> >   #include <rte_kvargs.h>
> >   #include <rte_vhost.h>
> >   #include <rte_spinlock.h>
> > +#include <rte_vhost_async.h>
> > +#include <rte_dmadev.h>
> >
> >   #include "rte_eth_vhost.h"
> >
> > @@ -37,8 +40,15 @@ enum {VIRTIO_RXQ, VIRTIO_TXQ, VIRTIO_QNUM};
> >   #define ETH_VHOST_LINEAR_BUF              "linear-buffer"
> >   #define ETH_VHOST_EXT_BUF         "ext-buffer"
> >   #define ETH_VHOST_LEGACY_OL_FLAGS "legacy-ol-flags"
> > +#define ETH_VHOST_DMA_ARG          "dmas"
> > +#define ETH_VHOST_DMA_RING_SIZE            "dma-ring-size"
> >   #define VHOST_MAX_PKT_BURST 32
> >
> > +#define INVALID_DMA_ID             -1
> > +#define DEFAULT_DMA_RING_SIZE      4096
> > +/* Minimum package size is 64, dma ring size should be greater than 32 */
> > +#define MINIMUM_DMA_RING_SIZE      64
> 
> Why? Is that related to Intel HW?

No, it is for TCP/UDP tests. The max TCP/UDP length is 64KB, so one packet
needs 32 pktmbufs. If we set DMA ring size to 32, it's possible that vhost 
cannot
enqueue/dequeue one packet, as there may be some control packets mixed and
DMA ring full will cause enqueue/dequeue the packet failed. So the recommend
minimal DMA ring size is > 32.

We can remove the macro, but add one comment to suggest the DMA ring
size.
> 
> > +
> >   static const char *valid_arguments[] = {
> >     ETH_VHOST_IFACE_ARG,
> >     ETH_VHOST_QUEUES_ARG,
> > @@ -49,6 +59,8 @@ static const char *valid_arguments[] = {
> >     ETH_VHOST_LINEAR_BUF,
> >     ETH_VHOST_EXT_BUF,
> >     ETH_VHOST_LEGACY_OL_FLAGS,
> > +   ETH_VHOST_DMA_ARG,
> > +   ETH_VHOST_DMA_RING_SIZE,
> >     NULL
> >   };
> >
> > @@ -80,8 +92,39 @@ struct vhost_queue {
> >     struct vhost_stats stats;
> >     int intr_enable;
> >     rte_spinlock_t intr_lock;
> > +
> > +   /* Flag of enabling async data path */
> > +   bool async_register;
> > +   /* DMA device ID */
> > +   int16_t dma_id;
> > +   /**
> > +    * For a Rx queue, "txq" points to its peer Tx queue.
> > +    * For a Tx queue, "txq" is never used.
> > +    */
> > +   struct vhost_queue *txq;
> > +   /* Array to keep DMA completed packets */
> > +   struct rte_mbuf *cmpl_pkts[VHOST_MAX_PKT_BURST];
> >   };
> >
> > +struct dma_input_info {
> > +   int16_t dmas[RTE_MAX_QUEUES_PER_PORT * 2];
> > +   uint16_t dma_ring_size;
> > +};
> > +
> > +static int16_t configured_dmas[RTE_DMADEV_DEFAULT_MAX];
> > +static int dma_count;
> > +
> > +/**
> > + * By default, its Rx path to call rte_vhost_poll_enqueue_completed() for
> enqueue operations.
> > + * However, Rx function is never been called in testpmd "txonly"
> > +mode, thus causing virtio
> > + * cannot receive DMA completed packets. To make txonly mode work
> > +correctly, we provide a
> > + * command in testpmd to call rte_vhost_poll_enqueue_completed() in Tx
> path.
> > + *
> > + * When set async_tx_poll_completed to true, Tx path calls
> > +rte_vhost_poll_enqueue_completed();
> > + * otherwise, Rx path calls it.
> > + */
> > +bool async_tx_poll_completed;
> > +
> >   struct pmd_internal {
> >     rte_atomic32_t dev_attached;
> >     char *iface_name;
> > @@ -94,6 +137,10 @@ struct pmd_internal {
> >     bool vlan_strip;
> >     bool rx_sw_csum;
> >     bool tx_sw_csum;
> > +   struct {
> > +           int16_t dma_id;
> > +           bool async_register;
> > +   } queue_dmas[RTE_MAX_QUEUES_PER_PORT * 2];
> >   };
> >
> >   struct internal_list {
> > @@ -124,6 +171,17 @@ struct rte_vhost_vring_state {
> >
> >   static struct rte_vhost_vring_state *vring_states[RTE_MAX_ETHPORTS];
> >
> > +static bool
> > +dma_is_configured(int16_t dma_id)
> > +{
> > +   int i;
> > +
> > +   for (i = 0; i < dma_count; i++)
> > +           if (configured_dmas[i] == dma_id)
> > +                   return true;
> > +   return false;
> > +}
> > +
> >   static int
> >   vhost_dev_xstats_reset(struct rte_eth_dev *dev)
> >   {
> > @@ -396,15 +454,27 @@ vhost_dev_rx_sw_csum(struct rte_mbuf *mbuf)
> >     mbuf->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_GOOD;
> >   }
> >
> > +static inline void
> > +vhost_tx_free_completed(int vid, uint16_t virtqueue_id, int16_t dma_id,
> > +           struct rte_mbuf **pkts, uint16_t count) {
> > +   uint16_t i, ret;
> > +
> > +   ret = rte_vhost_poll_enqueue_completed(vid, virtqueue_id, pkts,
> count, dma_id, 0);
> > +   for (i = 0; likely(i < ret); i++)
> > +           rte_pktmbuf_free(pkts[i]);
> > +}
> > +
> >   static uint16_t
> >   eth_vhost_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
> >   {
> >     struct vhost_queue *r = q;
> >     uint16_t i, nb_rx = 0;
> >     uint16_t nb_receive = nb_bufs;
> > +   uint16_t nb_pkts, num;
> >
> >     if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
> > -           return 0;
> > +           goto tx_poll;
> >
> >     rte_atomic32_set(&r->while_queuing, 1);
> >
> > @@ -412,19 +482,32 @@ eth_vhost_rx(void *q, struct rte_mbuf **bufs,
> uint16_t nb_bufs)
> >             goto out;
> >
> >     /* Dequeue packets from guest TX queue */
> > -   while (nb_receive) {
> > -           uint16_t nb_pkts;
> > -           uint16_t num = (uint16_t)RTE_MIN(nb_receive,
> > -                                            VHOST_MAX_PKT_BURST);
> > -
> > -           nb_pkts = rte_vhost_dequeue_burst(r->vid, r->virtqueue_id,
> > -                                             r->mb_pool, &bufs[nb_rx],
> > -                                             num);
> > -
> > -           nb_rx += nb_pkts;
> > -           nb_receive -= nb_pkts;
> > -           if (nb_pkts < num)
> > -                   break;
> > +   if (!r->async_register) {
> > +           while (nb_receive) {
> > +                   num = (uint16_t)RTE_MIN(nb_receive,
> VHOST_MAX_PKT_BURST);
> > +                   nb_pkts = rte_vhost_dequeue_burst(r->vid, r-
> >virtqueue_id,
> > +                                           r->mb_pool, &bufs[nb_rx],
> > +                                           num);
> > +
> > +                   nb_rx += nb_pkts;
> > +                   nb_receive -= nb_pkts;
> > +                   if (nb_pkts < num)
> > +                           break;
> > +           }
> > +   } else {
> > +           int nr_inflight;
> > +
> > +           while (nb_receive) {
> > +                   num = (uint16_t)RTE_MIN(nb_receive,
> VHOST_MAX_PKT_BURST);
> > +                   nb_pkts = rte_vhost_async_try_dequeue_burst(r-
> >vid, r->virtqueue_id,
> > +                                           r->mb_pool, &bufs[nb_rx],
> num, &nr_inflight,
> > +                                           r->dma_id, 0);
> > +
> > +                   nb_rx += nb_pkts;
> > +                   nb_receive -= nb_pkts;
> > +                   if (nb_pkts < num)
> > +                           break;
> > +           }
> >     }
> >
> >     r->stats.pkts += nb_rx;
> > @@ -445,6 +528,17 @@ eth_vhost_rx(void *q, struct rte_mbuf **bufs,
> uint16_t nb_bufs)
> >   out:
> >     rte_atomic32_set(&r->while_queuing, 0);
> >
> > +tx_poll:
> > +   /**
> > +    * Poll and free completed packets for the virtqueue of Tx queue.
> > +    * Note that we access Tx queue's virtqueue, which is protected
> > +    * by vring lock.
> > +    */
> > +   if (r->txq->async_register && !async_tx_poll_completed &&
> > +                   rte_atomic32_read(&r->txq->allow_queuing) == 1)
> > +           vhost_tx_free_completed(r->vid, r->txq->virtqueue_id, r-
> >txq->dma_id,
> > +                           r->cmpl_pkts, VHOST_MAX_PKT_BURST);
> > +
> >     return nb_rx;
> >   }
> >
> > @@ -456,6 +550,7 @@ eth_vhost_tx(void *q, struct rte_mbuf **bufs,
> uint16_t nb_bufs)
> >     uint16_t nb_send = 0;
> >     uint64_t nb_bytes = 0;
> >     uint64_t nb_missed = 0;
> > +   uint16_t nb_pkts, num;
> >
> >     if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
> >             return 0;
> > @@ -486,31 +581,49 @@ eth_vhost_tx(void *q, struct rte_mbuf **bufs,
> uint16_t nb_bufs)
> >     }
> >
> >     /* Enqueue packets to guest RX queue */
> > -   while (nb_send) {
> > -           uint16_t nb_pkts;
> > -           uint16_t num = (uint16_t)RTE_MIN(nb_send,
> > -                                            VHOST_MAX_PKT_BURST);
> > +   if (!r->async_register) {
> > +           while (nb_send) {
> > +                   num = (uint16_t)RTE_MIN(nb_send,
> VHOST_MAX_PKT_BURST);
> > +                   nb_pkts = rte_vhost_enqueue_burst(r->vid, r-
> >virtqueue_id,
> > +                                           &bufs[nb_tx], num);
> > +
> > +                   nb_tx += nb_pkts;
> > +                   nb_send -= nb_pkts;
> > +                   if (nb_pkts < num)
> > +                           break;
> > +           }
> >
> > -           nb_pkts = rte_vhost_enqueue_burst(r->vid, r->virtqueue_id,
> > -                                             &bufs[nb_tx], num);
> > +           for (i = 0; likely(i < nb_tx); i++) {
> > +                   nb_bytes += bufs[i]->pkt_len;
> > +                   rte_pktmbuf_free(bufs[i]);
> > +           }
> >
> > -           nb_tx += nb_pkts;
> > -           nb_send -= nb_pkts;
> > -           if (nb_pkts < num)
> > -                   break;
> > -   }
> > +   } else {
> > +           while (nb_send) {
> > +                   num = (uint16_t)RTE_MIN(nb_send,
> VHOST_MAX_PKT_BURST);
> > +                   nb_pkts = rte_vhost_submit_enqueue_burst(r->vid,
> r->virtqueue_id,
> > +                                                   &bufs[nb_tx], num, r-
> >dma_id, 0);
> > +
> > +                   nb_tx += nb_pkts;
> > +                   nb_send -= nb_pkts;
> > +                   if (nb_pkts < num)
> > +                           break;
> > +           }
> >
> > -   for (i = 0; likely(i < nb_tx); i++)
> > -           nb_bytes += bufs[i]->pkt_len;
> > +           for (i = 0; likely(i < nb_tx); i++)
> > +                   nb_bytes += bufs[i]->pkt_len;
> >
> > -   nb_missed = nb_bufs - nb_tx;
> > +           if (unlikely(async_tx_poll_completed)) {
> > +                   vhost_tx_free_completed(r->vid, r->virtqueue_id, r-
> >dma_id, r->cmpl_pkts,
> > +                                   VHOST_MAX_PKT_BURST);
> > +           }
> > +   }
> >
> > +   nb_missed = nb_bufs - nb_tx;
> >     r->stats.pkts += nb_tx;
> >     r->stats.bytes += nb_bytes;
> >     r->stats.missed_pkts += nb_missed;
> >
> > -   for (i = 0; likely(i < nb_tx); i++)
> > -           rte_pktmbuf_free(bufs[i]);
> >   out:
> >     rte_atomic32_set(&r->while_queuing, 0);
> >
> > @@ -798,6 +911,8 @@ queue_setup(struct rte_eth_dev *eth_dev, struct
> pmd_internal *internal)
> >             vq->vid = internal->vid;
> >             vq->internal = internal;
> >             vq->port = eth_dev->data->port_id;
> > +           if (i < eth_dev->data->nb_tx_queues)
> > +                   vq->txq = eth_dev->data->tx_queues[i];
> >     }
> >     for (i = 0; i < eth_dev->data->nb_tx_queues; i++) {
> >             vq = eth_dev->data->tx_queues[i];
> > @@ -878,6 +993,30 @@ new_device(int vid)
> >     return 0;
> >   }
> >
> > +static inline void
> > +async_clear_virtqueue(int vid, uint16_t virtqueue_id, int16_t dma_id)
> > +{
> > +   struct rte_mbuf *pkts[VHOST_MAX_PKT_BURST];
> > +   uint16_t i, ret, nr_done = 0;
> > +
> > +   if (vid == -1)
> > +           return;
> > +
> > +   while (rte_vhost_async_get_inflight(vid, virtqueue_id) > 0) {
> > +           ret = rte_vhost_clear_queue(vid, virtqueue_id, pkts,
> VHOST_MAX_PKT_BURST, dma_id,
> > +                           0);
> > +           for (i = 0; i < ret ; i++)
> > +                   rte_pktmbuf_free(pkts[i]);
> > +
> > +           nr_done += ret;
> > +   }
> > +   VHOST_LOG(INFO, "Completed %u pkts for vid-%u vring-%u\n",
> nr_done,
> > +vid, virtqueue_id);
> > +
> > +   if (rte_vhost_async_channel_unregister(vid, virtqueue_id))
> > +           VHOST_LOG(ERR, "Failed to unregister async for vid-%u
> vring-%u\n", vid,
> > +                           virtqueue_id);
> > +}
> > +
> >   static void
> >   destroy_device(int vid)
> >   {
> > @@ -908,13 +1047,27 @@ destroy_device(int vid)
> >                     vq = eth_dev->data->rx_queues[i];
> >                     if (!vq)
> >                             continue;
> > +                   if (vq->async_register) {
> > +                           async_clear_virtqueue(vq->vid, vq-
> >virtqueue_id,
> > +                                                   vq->dma_id);
> > +                           vq->async_register = false;
> > +                           internal->queue_dmas[vq-
> >virtqueue_id].async_register = false;
> > +                   }
> >                     vq->vid = -1;
> > +
> >             }
> >             for (i = 0; i < eth_dev->data->nb_tx_queues; i++) {
> >                     vq = eth_dev->data->tx_queues[i];
> >                     if (!vq)
> >                             continue;
> > +                   if (vq->async_register) {
> > +                           async_clear_virtqueue(vq->vid, vq-
> >virtqueue_id,
> > +                                                   vq->dma_id);
> > +                           vq->async_register = false;
> > +                           internal->queue_dmas[vq-
> >virtqueue_id].async_register = false;
> > +                   }
> >                     vq->vid = -1;
> > +
> >             }
> >     }
> >
> > @@ -983,6 +1136,9 @@ vring_state_changed(int vid, uint16_t vring, int
> enable)
> >     struct rte_vhost_vring_state *state;
> >     struct rte_eth_dev *eth_dev;
> >     struct internal_list *list;
> > +   struct vhost_queue *queue;
> > +   struct pmd_internal *internal;
> > +   int qid;
> >     char ifname[PATH_MAX];
> >
> >     rte_vhost_get_ifname(vid, ifname, sizeof(ifname)); @@ -1011,6
> > +1167,65 @@ vring_state_changed(int vid, uint16_t vring, int enable)
> >
> >     update_queuing_status(eth_dev, false);
> >
> > +   qid = vring / VIRTIO_QNUM;
> > +   if (vring % VIRTIO_QNUM == VIRTIO_RXQ)
> > +           queue = eth_dev->data->tx_queues[qid];
> > +   else
> > +           queue = eth_dev->data->rx_queues[qid];
> > +
> > +   if (!queue)
> > +           goto skip;
> > +
> > +   internal = eth_dev->data->dev_private;
> > +
> > +   /* Register async data path for the queue assigned valid DMA device
> */
> > +   if (internal->queue_dmas[queue->virtqueue_id].dma_id ==
> INVALID_DMA_ID)
> > +           goto skip;
> > +
> > +   if (enable && !queue->async_register) {
> > +           if (rte_vhost_async_channel_register_thread_unsafe(vid,
> vring)) {
> > +                   VHOST_LOG(ERR, "Failed to register async for vid-%u
> vring-%u!\n", vid,
> > +                                   vring);
> > +                   return -1;
> > +           }
> > +
> > +           queue->async_register = true;
> > +           internal->queue_dmas[vring].async_register = true;
> > +
> > +           VHOST_LOG(INFO, "Succeed to register async for vid-%u
> vring-%u\n", vid, vring);
> > +   }
> > +
> > +   if (!enable && queue->async_register) {
> > +           struct rte_mbuf *pkts[VHOST_MAX_PKT_BURST];
> > +           uint16_t ret, i, nr_done = 0;
> > +           uint16_t dma_id = queue->dma_id;
> > +
> > +           while (rte_vhost_async_get_inflight_thread_unsafe(vid,
> vring) > 0) {
> > +                   ret = rte_vhost_clear_queue_thread_unsafe(vid,
> vring, pkts,
> > +                                   VHOST_MAX_PKT_BURST, dma_id, 0);
> > +
> > +                   for (i = 0; i < ret ; i++)
> > +                           rte_pktmbuf_free(pkts[i]);
> > +
> > +                   nr_done += ret;
> > +           }
> > +
> > +           VHOST_LOG(INFO, "Completed %u in-flight pkts for vid-%u
> vring-%u\n", nr_done, vid,
> > +                           vring);
> > +
> > +           if (rte_vhost_async_channel_unregister_thread_unsafe(vid,
> vring)) {
> > +                   VHOST_LOG(ERR, "Failed to unregister async for vid-
> %u vring-%u\n", vid,
> > +                                   vring);
> > +                   return -1;
> > +           }
> > +
> > +           queue->async_register = false;
> > +           internal->queue_dmas[vring].async_register = false;
> > +
> > +           VHOST_LOG(INFO, "Succeed to unregister async for vid-%u
> vring-%u\n", vid, vring);
> > +   }
> > +
> > +skip:
> >     VHOST_LOG(INFO, "vring%u is %s\n",
> >                     vring, enable ? "enabled" : "disabled");
> >
> > @@ -1159,6 +1374,12 @@ rte_eth_vhost_get_vid_from_port_id(uint16_t
> port_id)
> >     return vid;
> >   }
> >
> > +void
> > +rte_eth_vhost_async_tx_poll_completed(bool enable) {
> > +   async_tx_poll_completed = enable;
> > +}
> > +
> >   static int
> >   eth_dev_configure(struct rte_eth_dev *dev)
> >   {
> > @@ -1220,6 +1441,7 @@ eth_dev_close(struct rte_eth_dev *dev)
> >   {
> >     struct pmd_internal *internal;
> >     struct internal_list *list;
> > +   struct vhost_queue *queue;
> >     unsigned int i, ret;
> >
> >     if (rte_eal_process_type() != RTE_PROC_PRIMARY) @@ -1233,6
> +1455,25
> > @@ eth_dev_close(struct rte_eth_dev *dev)
> >
> >     list = find_internal_resource(internal->iface_name);
> >     if (list) {
> > +           /* Make sure all in-flight packets are completed before
> destroy vhost */
> > +           if (dev->data->rx_queues) {
> > +                   for (i = 0; i < dev->data->nb_rx_queues; i++) {
> > +                           queue = dev->data->rx_queues[i];
> > +                           if (queue->async_register)
> > +                                   async_clear_virtqueue(queue->vid,
> queue->virtqueue_id,
> > +                                                   queue->dma_id);
> > +                   }
> > +           }
> > +
> > +           if (dev->data->tx_queues) {
> > +                   for (i = 0; i < dev->data->nb_tx_queues; i++) {
> > +                           queue = dev->data->tx_queues[i];
> > +                           if (queue->async_register)
> > +                                   async_clear_virtqueue(queue->vid,
> queue->virtqueue_id,
> > +                                                   queue->dma_id);
> > +                   }
> > +           }
> > +
> >             rte_vhost_driver_unregister(internal->iface_name);
> >             pthread_mutex_lock(&internal_list_lock);
> >             TAILQ_REMOVE(&internal_list, list, next); @@ -1267,6
> +1508,7 @@
> > eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t rx_queue_id,
> >                struct rte_mempool *mb_pool)
> >   {
> >     struct vhost_queue *vq;
> > +   struct pmd_internal *internal = dev->data->dev_private;
> >
> >     vq = rte_zmalloc_socket(NULL, sizeof(struct vhost_queue),
> >                     RTE_CACHE_LINE_SIZE, socket_id);
> > @@ -1277,6 +1519,8 @@ eth_rx_queue_setup(struct rte_eth_dev *dev,
> > uint16_t rx_queue_id,
> >
> >     vq->mb_pool = mb_pool;
> >     vq->virtqueue_id = rx_queue_id * VIRTIO_QNUM + VIRTIO_TXQ;
> > +   vq->async_register = internal->queue_dmas[vq-
> >virtqueue_id].async_register;
> > +   vq->dma_id = internal->queue_dmas[vq->virtqueue_id].dma_id;
> >     rte_spinlock_init(&vq->intr_lock);
> >     dev->data->rx_queues[rx_queue_id] = vq;
> >
> > @@ -1290,6 +1534,7 @@ eth_tx_queue_setup(struct rte_eth_dev *dev,
> uint16_t tx_queue_id,
> >                const struct rte_eth_txconf *tx_conf __rte_unused)
> >   {
> >     struct vhost_queue *vq;
> > +   struct pmd_internal *internal = dev->data->dev_private;
> >
> >     vq = rte_zmalloc_socket(NULL, sizeof(struct vhost_queue),
> >                     RTE_CACHE_LINE_SIZE, socket_id);
> > @@ -1299,6 +1544,8 @@ eth_tx_queue_setup(struct rte_eth_dev *dev,
> uint16_t tx_queue_id,
> >     }
> >
> >     vq->virtqueue_id = tx_queue_id * VIRTIO_QNUM + VIRTIO_RXQ;
> > +   vq->async_register = internal->queue_dmas[vq-
> >virtqueue_id].async_register;
> > +   vq->dma_id = internal->queue_dmas[vq->virtqueue_id].dma_id;
> >     rte_spinlock_init(&vq->intr_lock);
> >     dev->data->tx_queues[tx_queue_id] = vq;
> >
> > @@ -1509,13 +1756,14 @@ static const struct eth_dev_ops ops = {
> >   static int
> >   eth_dev_vhost_create(struct rte_vdev_device *dev, char *iface_name,
> >     int16_t queues, const unsigned int numa_node, uint64_t flags,
> > -   uint64_t disable_flags)
> > +   uint64_t disable_flags, struct dma_input_info *dma_input)
> >   {
> >     const char *name = rte_vdev_device_name(dev);
> >     struct rte_eth_dev_data *data;
> >     struct pmd_internal *internal = NULL;
> >     struct rte_eth_dev *eth_dev = NULL;
> >     struct rte_ether_addr *eth_addr = NULL;
> > +   int i;
> >
> >     VHOST_LOG(INFO, "Creating VHOST-USER backend on numa
> socket %u\n",
> >             numa_node);
> > @@ -1564,6 +1812,12 @@ eth_dev_vhost_create(struct rte_vdev_device
> *dev, char *iface_name,
> >     eth_dev->rx_pkt_burst = eth_vhost_rx;
> >     eth_dev->tx_pkt_burst = eth_vhost_tx;
> >
> > +   for (i = 0; i < RTE_MAX_QUEUES_PER_PORT * 2; i++) {
> > +           /* Invalid DMA ID indicates the queue does not want to
> enable async data path */
> > +           internal->queue_dmas[i].dma_id = dma_input->dmas[i];
> > +           internal->queue_dmas[i].async_register = false;
> > +   }
> > +
> >     rte_eth_dev_probing_finish(eth_dev);
> >     return 0;
> >
> > @@ -1603,6 +1857,155 @@ open_int(const char *key __rte_unused, const
> char *value, void *extra_args)
> >     return 0;
> >   }
> >
> > +static int
> > +init_dma(int16_t dma_id, uint16_t ring_size) {
> > +   struct rte_dma_info info;
> > +   struct rte_dma_conf dev_config = { .nb_vchans = 1 };
> > +   struct rte_dma_vchan_conf qconf = {
> > +           .direction = RTE_DMA_DIR_MEM_TO_MEM,
> > +   };
> > +   int ret = 0;
> > +
> > +   if (dma_is_configured(dma_id))
> > +           goto out;
> > +
> > +   if (rte_dma_info_get(dma_id, &info) != 0) {
> > +           VHOST_LOG(ERR, "dma %u get info failed\n", dma_id);
> > +           ret = -1;
> > +           goto out;
> > +   }
> > +
> > +   if (info.max_vchans < 1) {
> > +           VHOST_LOG(ERR, "No channels available on dma %d\n",
> dma_id);
> > +           ret = -1;
> > +           goto out;
> > +   }
> > +
> > +   if (rte_dma_configure(dma_id, &dev_config) != 0) {
> > +           VHOST_LOG(ERR, "dma %u configure failed\n", dma_id);
> > +           ret = -1;
> > +           goto out;
> > +   }
> > +
> > +   rte_dma_info_get(dma_id, &info);
> > +   if (info.nb_vchans != 1) {
> > +           VHOST_LOG(ERR, "dma %u has no queues\n", dma_id);
> > +           ret = -1;
> > +           goto out;
> > +   }
> > +
> > +   ring_size = RTE_MAX(ring_size, MINIMUM_DMA_RING_SIZE);
> > +   ring_size = RTE_MAX(ring_size, info.min_desc);
> > +   qconf.nb_desc = RTE_MIN(ring_size, info.max_desc);
> > +   if (rte_dma_vchan_setup(dma_id, 0, &qconf) != 0) {
> > +           VHOST_LOG(ERR, "dma %u queue setup failed\n", dma_id);
> > +           ret = -1;
> > +           goto out;
> > +   }
> > +
> > +   if (rte_dma_start(dma_id) != 0) {
> > +           VHOST_LOG(ERR, "dma %u start failed\n", dma_id);
> > +           ret = -1;
> > +           goto out;
> > +   }
> > +
> > +   configured_dmas[dma_count++] = dma_id;
> > +
> > +out:
> > +   return ret;
> > +}
> > +
> > +static int
> > +open_dma(const char *key __rte_unused, const char *value, void
> > +*extra_args) {
> > +   struct dma_input_info *dma_input = extra_args;
> > +   char *input = strndup(value, strlen(value) + 1);
> > +   char *addrs = input;
> > +   char *ptrs[2];
> > +   char *start, *end, *substr;
> > +   uint16_t qid, virtqueue_id;
> > +   int16_t dma_id;
> > +   int i, ret = 0;
> > +
> > +   while (isblank(*addrs))
> > +           addrs++;
> > +   if (*addrs == '\0') {
> > +           VHOST_LOG(ERR, "No input DMA addresses\n");
> > +           ret = -1;
> > +           goto out;
> > +   }
> > +
> > +   /* process DMA devices within bracket. */
> > +   addrs++;
> > +   substr = strtok(addrs, ";]");
> > +   if (!substr) {
> > +           VHOST_LOG(ERR, "No input DMA addresse\n");
> > +           ret = -1;
> > +           goto out;
> > +   }
> > +
> > +   do {
> > +           rte_strsplit(substr, strlen(substr), ptrs, 2, '@');
> > +
> > +           char *txq, *rxq;
> > +           bool is_txq;
> > +
> > +           txq = strstr(ptrs[0], "txq");
> > +           rxq = strstr(ptrs[0], "rxq");
> > +           if (txq == NULL && rxq == NULL) {
> > +                   VHOST_LOG(ERR, "Illegal queue\n");
> > +                   ret = -1;
> > +                   goto out;
> > +           } else if (txq) {
> > +                   is_txq = true;
> > +                   start = txq;
> > +           } else {
> > +                   is_txq = false;
> > +                   start = rxq;
> > +           }
> > +
> > +           start += 3;
> > +           qid = strtol(start, &end, 0);
> > +           if (end == start) {
> > +                   VHOST_LOG(ERR, "No input queue ID\n");
> > +                   ret = -1;
> > +                   goto out;
> > +           }
> > +
> > +           virtqueue_id = is_txq ? qid * 2 + VIRTIO_RXQ : qid * 2 +
> > +VIRTIO_TXQ;
> > +
> > +           dma_id = rte_dma_get_dev_id_by_name(ptrs[1]);
> > +           if (dma_id < 0) {
> > +                   VHOST_LOG(ERR, "Fail to find DMA device %s.\n",
> ptrs[1]);
> > +                   ret = -1;
> > +                   goto out;
> > +           }
> > +
> > +           ret = init_dma(dma_id, dma_input->dma_ring_size);
> > +           if (ret != 0) {
> > +                   VHOST_LOG(ERR, "Fail to initialize DMA %u\n",
> dma_id);
> > +                   ret = -1;
> > +                   break;
> > +           }
> > +
> > +           dma_input->dmas[virtqueue_id] = dma_id;
> > +
> > +           substr = strtok(NULL, ";]");
> > +   } while (substr);
> > +
> > +   for (i = 0; i < dma_count; i++) {
> > +           if (rte_vhost_async_dma_configure(configured_dmas[i], 0) <
> 0) {
> > +                   VHOST_LOG(ERR, "Fail to configure DMA %u to
> vhost\n", configured_dmas[i]);
> > +                   ret = -1;
> > +           }
> > +   }
> > +
> > +out:
> > +   free(input);
> > +   return ret;
> > +}
> > +
> >   static int
> >   rte_pmd_vhost_probe(struct rte_vdev_device *dev)
> >   {
> > @@ -1621,6 +2024,10 @@ rte_pmd_vhost_probe(struct rte_vdev_device
> *dev)
> >     int legacy_ol_flags = 0;
> >     struct rte_eth_dev *eth_dev;
> >     const char *name = rte_vdev_device_name(dev);
> > +   struct dma_input_info dma_input;
> > +
> > +   memset(dma_input.dmas, INVALID_DMA_ID,
> sizeof(dma_input.dmas));
> > +   dma_input.dma_ring_size = DEFAULT_DMA_RING_SIZE;
> >
> >     VHOST_LOG(INFO, "Initializing pmd_vhost for %s\n", name);
> >
> > @@ -1736,6 +2143,43 @@ rte_pmd_vhost_probe(struct rte_vdev_device
> *dev)
> >                     goto out_free;
> >     }
> >
> > +   if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_RING_SIZE) == 1) {
> > +           if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_ARG) == 1) {
> > +                   ret = rte_kvargs_process(kvlist,
> ETH_VHOST_DMA_RING_SIZE,
> > +                                   &open_int,
> &dma_input.dma_ring_size);
> > +                   if (ret < 0)
> > +                           goto out_free;
> > +
> > +                   if (!rte_is_power_of_2(dma_input.dma_ring_size)) {
> > +                           dma_input.dma_ring_size =
> rte_align32pow2(dma_input.dma_ring_size);
> > +                           VHOST_LOG(INFO, "Convert dma_ring_size to
> the power of two %u\n",
> > +                                           dma_input.dma_ring_size);
> > +                   }
> > +           } else {
> > +                   VHOST_LOG(WARNING, "%s is not specified, skip to
> parse %s\n",
> > +                                   ETH_VHOST_DMA_ARG,
> ETH_VHOST_DMA_RING_SIZE);
> > +           }
> > +   }
> > +
> > +   if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_ARG) == 1) {
> > +           if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_RING_SIZE) ==
> 0)
> > +                   VHOST_LOG(INFO, "Use default dma ring size %d\n",
> > +DEFAULT_DMA_RING_SIZE);
> > +
> > +           ret = rte_kvargs_process(kvlist, ETH_VHOST_DMA_ARG,
> > +                                    &open_dma, &dma_input);
> > +           if (ret < 0) {
> > +                   VHOST_LOG(ERR, "Failed to parse %s\n",
> ETH_VHOST_DMA_ARG);
> > +                   goto out_free;
> > +           }
> > +
> > +           flags |= RTE_VHOST_USER_ASYNC_COPY;
> > +           /**
> > +            * Don't support live migration when enable
> > +            * DMA acceleration.
> > +            */
> > +           disable_flags |= (1ULL << VHOST_F_LOG_ALL);
> > +   }
> > +
> >     if (legacy_ol_flags == 0)
> >             flags |= RTE_VHOST_USER_NET_COMPLIANT_OL_FLAGS;
> >
> > @@ -1743,7 +2187,7 @@ rte_pmd_vhost_probe(struct rte_vdev_device
> *dev)
> >             dev->device.numa_node = rte_socket_id();
> >
> >     ret = eth_dev_vhost_create(dev, iface_name, queues,
> > -                              dev->device.numa_node, flags,
> disable_flags);
> > +                   dev->device.numa_node, flags, disable_flags,
> &dma_input);
> >     if (ret == -1)
> >             VHOST_LOG(ERR, "Failed to create %s\n", name);
> >
> > @@ -1787,4 +2231,6 @@ RTE_PMD_REGISTER_PARAM_STRING(net_vhost,
> >     "postcopy-support=<0|1> "
> >     "tso=<0|1> "
> >     "linear-buffer=<0|1> "
> > -   "ext-buffer=<0|1>");
> > +   "ext-buffer=<0|1> "
> > +   "dma-ring-size=<int> "
> > +   "dmas=[txq0@dma_addr;rxq0@dma_addr]");
> > diff --git a/drivers/net/vhost/rte_eth_vhost.h
> > b/drivers/net/vhost/rte_eth_vhost.h
> > index 0e68b9f668..146c98803d 100644
> > --- a/drivers/net/vhost/rte_eth_vhost.h
> > +++ b/drivers/net/vhost/rte_eth_vhost.h
> > @@ -52,6 +52,21 @@ int rte_eth_vhost_get_queue_event(uint16_t port_id,
> >    */
> >   int rte_eth_vhost_get_vid_from_port_id(uint16_t port_id);
> >
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change, or be removed, without prior
> > +notice
> > + *
> > + * By default, rte_vhost_poll_enqueue_completed() is called in Rx path.
> > + * This function enables Tx path, rather than Rx path, to poll
> > +completed
> > + * packets for vhost async enqueue operations. Note that virtio may
> > +never
> > + * receive DMA completed packets if there are no more Tx operations.
> > + *
> > + * @param enable
> > + *  True indicates Tx path to call rte_vhost_poll_enqueue_completed().
> > + */
> > +__rte_experimental
> > +void rte_eth_vhost_async_tx_poll_completed(bool enable);
> > +
> >   #ifdef __cplusplus
> >   }
> >   #endif
> > diff --git a/drivers/net/vhost/version.map
> > b/drivers/net/vhost/version.map index e42c89f1eb..0a40441227 100644
> > --- a/drivers/net/vhost/version.map
> > +++ b/drivers/net/vhost/version.map
> > @@ -6,3 +6,10 @@ DPDK_23 {
> >
> >     local: *;
> >   };
> > +
> > +EXPERIMENTAL {
> > +   global:
> > +
> > +   # added in 22.11
> > +   rte_eth_vhost_async_tx_poll_completed;
> 
> I'm not in favor of adding driver-specific API.
> IMHO, you should just decide whether doing completion on Tx or Rx path
> and stick with it.
> 
> Doing completion on the Tx path not being reliable (i.e. if not other packets
> are sent, last enqueued packets are not completed), I guess only doing
> completion in Rx path is the right thing to do.

It's good to know your thoughts. But if we remove the driver-specific API and
put poll_enqueue_completed in Rx path, Testpmd txonly mode cannot work
with DMA accelerated enqueue, as no one will call poll_enqueue_completed.
Any thoughts?

Thanks,
Jiayu

Reply via email to