Hi Maxime, Thanks for your comments. Please see replies inline.
> -----Original Message----- > From: Maxime Coquelin <maxime.coque...@redhat.com> > Sent: Monday, October 24, 2022 5:08 PM > To: Wang, YuanX <yuanx.w...@intel.com>; Xia, Chenbo > <chenbo....@intel.com> > Cc: dev@dpdk.org; Hu, Jiayu <jiayu...@intel.com>; Jiang, Cheng1 > <cheng1.ji...@intel.com>; Ma, WenwuX <wenwux...@intel.com>; He, > Xingguang <xingguang...@intel.com> > Subject: Re: [PATCH v5] net/vhost: support asynchronous data path > > Hi Yuan, > > Please fix your system for the next times, your patch is dated several hours > in > the future. > > On 10/24/22 17:14, Yuan Wang wrote: > > Vhost asynchronous data-path offloads packet copy from the CPU to the > > DMA engine. As a result, large packet copy can be accelerated by the > > DMA engine, and vhost can free CPU cycles for higher level > > Vhost > > > functions. > > > > In this patch, we enable asynchronous data-path for vhostpmd. > > Asynchronous data path is enabled per tx/rx queue, and users need to > > specify the DMA device used by the tx/rx queue. Each tx/rx queue only > > supports to use one DMA device, but one DMA device can be shared > among > > multiple tx/rx queues of different vhost PMD ports. > > > > Two PMD parameters are added: > > - dmas: specify the used DMA device for a tx/rx queue. > > (Default: no queues enable asynchronous data path) > > - dma-ring-size: DMA ring size. > > (Default: 4096). > > > > Here is an example: > > --vdev > 'eth_vhost0,iface=./s0,dmas=[txq0@0000:00.01.0;rxq0@0000:00.01.1],dma- > ring-size=4096' > > > > Signed-off-by: Jiayu Hu <jiayu...@intel.com> > > Signed-off-by: Yuan Wang <yuanx.w...@intel.com> > > Signed-off-by: Wenwu Ma <wenwux...@intel.com> > > > > --- > > v5: > > - add error log of testpmd cmd > > > > v4: > > - add txq allow_queuing check on poll completed > > - unregister dma channel in destroy_device > > - add dma ring size minimum limit > > - fix serveral code style and logging issues > > > > v3: > > - add the API to version.map > > > > v2: > > - add missing file > > - hide async_tx_poll_completed > > - change default DMA ring size to 4096 > > > > --- > > drivers/net/vhost/meson.build | 1 + > > drivers/net/vhost/rte_eth_vhost.c | 512 > ++++++++++++++++++++++++++++-- > > drivers/net/vhost/rte_eth_vhost.h | 15 + > > drivers/net/vhost/version.map | 7 + > > drivers/net/vhost/vhost_testpmd.c | 67 ++++ > > 5 files changed, 569 insertions(+), 33 deletions(-) > > create mode 100644 drivers/net/vhost/vhost_testpmd.c > > > > diff --git a/drivers/net/vhost/meson.build > > b/drivers/net/vhost/meson.build index f481a3a4b8..22a0ab3a58 100644 > > --- a/drivers/net/vhost/meson.build > > +++ b/drivers/net/vhost/meson.build > > @@ -9,4 +9,5 @@ endif > > > > deps += 'vhost' > > sources = files('rte_eth_vhost.c') > > +testpmd_sources = files('vhost_testpmd.c') > > headers = files('rte_eth_vhost.h') > > diff --git a/drivers/net/vhost/rte_eth_vhost.c > > b/drivers/net/vhost/rte_eth_vhost.c > > index b152279fac..80cd9c8d92 100644 > > --- a/drivers/net/vhost/rte_eth_vhost.c > > +++ b/drivers/net/vhost/rte_eth_vhost.c > > @@ -7,6 +7,7 @@ > > #include <pthread.h> > > #include <stdbool.h> > > #include <sys/epoll.h> > > +#include <ctype.h> > > > > #include <rte_mbuf.h> > > #include <ethdev_driver.h> > > @@ -18,6 +19,8 @@ > > #include <rte_kvargs.h> > > #include <rte_vhost.h> > > #include <rte_spinlock.h> > > +#include <rte_vhost_async.h> > > +#include <rte_dmadev.h> > > > > #include "rte_eth_vhost.h" > > > > @@ -37,8 +40,15 @@ enum {VIRTIO_RXQ, VIRTIO_TXQ, VIRTIO_QNUM}; > > #define ETH_VHOST_LINEAR_BUF "linear-buffer" > > #define ETH_VHOST_EXT_BUF "ext-buffer" > > #define ETH_VHOST_LEGACY_OL_FLAGS "legacy-ol-flags" > > +#define ETH_VHOST_DMA_ARG "dmas" > > +#define ETH_VHOST_DMA_RING_SIZE "dma-ring-size" > > #define VHOST_MAX_PKT_BURST 32 > > > > +#define INVALID_DMA_ID -1 > > +#define DEFAULT_DMA_RING_SIZE 4096 > > +/* Minimum package size is 64, dma ring size should be greater than 32 */ > > +#define MINIMUM_DMA_RING_SIZE 64 > > Why? Is that related to Intel HW? No, it is for TCP/UDP tests. The max TCP/UDP length is 64KB, so one packet needs 32 pktmbufs. If we set DMA ring size to 32, it's possible that vhost cannot enqueue/dequeue one packet, as there may be some control packets mixed and DMA ring full will cause enqueue/dequeue the packet failed. So the recommend minimal DMA ring size is > 32. We can remove the macro, but add one comment to suggest the DMA ring size. > > > + > > static const char *valid_arguments[] = { > > ETH_VHOST_IFACE_ARG, > > ETH_VHOST_QUEUES_ARG, > > @@ -49,6 +59,8 @@ static const char *valid_arguments[] = { > > ETH_VHOST_LINEAR_BUF, > > ETH_VHOST_EXT_BUF, > > ETH_VHOST_LEGACY_OL_FLAGS, > > + ETH_VHOST_DMA_ARG, > > + ETH_VHOST_DMA_RING_SIZE, > > NULL > > }; > > > > @@ -80,8 +92,39 @@ struct vhost_queue { > > struct vhost_stats stats; > > int intr_enable; > > rte_spinlock_t intr_lock; > > + > > + /* Flag of enabling async data path */ > > + bool async_register; > > + /* DMA device ID */ > > + int16_t dma_id; > > + /** > > + * For a Rx queue, "txq" points to its peer Tx queue. > > + * For a Tx queue, "txq" is never used. > > + */ > > + struct vhost_queue *txq; > > + /* Array to keep DMA completed packets */ > > + struct rte_mbuf *cmpl_pkts[VHOST_MAX_PKT_BURST]; > > }; > > > > +struct dma_input_info { > > + int16_t dmas[RTE_MAX_QUEUES_PER_PORT * 2]; > > + uint16_t dma_ring_size; > > +}; > > + > > +static int16_t configured_dmas[RTE_DMADEV_DEFAULT_MAX]; > > +static int dma_count; > > + > > +/** > > + * By default, its Rx path to call rte_vhost_poll_enqueue_completed() for > enqueue operations. > > + * However, Rx function is never been called in testpmd "txonly" > > +mode, thus causing virtio > > + * cannot receive DMA completed packets. To make txonly mode work > > +correctly, we provide a > > + * command in testpmd to call rte_vhost_poll_enqueue_completed() in Tx > path. > > + * > > + * When set async_tx_poll_completed to true, Tx path calls > > +rte_vhost_poll_enqueue_completed(); > > + * otherwise, Rx path calls it. > > + */ > > +bool async_tx_poll_completed; > > + > > struct pmd_internal { > > rte_atomic32_t dev_attached; > > char *iface_name; > > @@ -94,6 +137,10 @@ struct pmd_internal { > > bool vlan_strip; > > bool rx_sw_csum; > > bool tx_sw_csum; > > + struct { > > + int16_t dma_id; > > + bool async_register; > > + } queue_dmas[RTE_MAX_QUEUES_PER_PORT * 2]; > > }; > > > > struct internal_list { > > @@ -124,6 +171,17 @@ struct rte_vhost_vring_state { > > > > static struct rte_vhost_vring_state *vring_states[RTE_MAX_ETHPORTS]; > > > > +static bool > > +dma_is_configured(int16_t dma_id) > > +{ > > + int i; > > + > > + for (i = 0; i < dma_count; i++) > > + if (configured_dmas[i] == dma_id) > > + return true; > > + return false; > > +} > > + > > static int > > vhost_dev_xstats_reset(struct rte_eth_dev *dev) > > { > > @@ -396,15 +454,27 @@ vhost_dev_rx_sw_csum(struct rte_mbuf *mbuf) > > mbuf->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_GOOD; > > } > > > > +static inline void > > +vhost_tx_free_completed(int vid, uint16_t virtqueue_id, int16_t dma_id, > > + struct rte_mbuf **pkts, uint16_t count) { > > + uint16_t i, ret; > > + > > + ret = rte_vhost_poll_enqueue_completed(vid, virtqueue_id, pkts, > count, dma_id, 0); > > + for (i = 0; likely(i < ret); i++) > > + rte_pktmbuf_free(pkts[i]); > > +} > > + > > static uint16_t > > eth_vhost_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs) > > { > > struct vhost_queue *r = q; > > uint16_t i, nb_rx = 0; > > uint16_t nb_receive = nb_bufs; > > + uint16_t nb_pkts, num; > > > > if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0)) > > - return 0; > > + goto tx_poll; > > > > rte_atomic32_set(&r->while_queuing, 1); > > > > @@ -412,19 +482,32 @@ eth_vhost_rx(void *q, struct rte_mbuf **bufs, > uint16_t nb_bufs) > > goto out; > > > > /* Dequeue packets from guest TX queue */ > > - while (nb_receive) { > > - uint16_t nb_pkts; > > - uint16_t num = (uint16_t)RTE_MIN(nb_receive, > > - VHOST_MAX_PKT_BURST); > > - > > - nb_pkts = rte_vhost_dequeue_burst(r->vid, r->virtqueue_id, > > - r->mb_pool, &bufs[nb_rx], > > - num); > > - > > - nb_rx += nb_pkts; > > - nb_receive -= nb_pkts; > > - if (nb_pkts < num) > > - break; > > + if (!r->async_register) { > > + while (nb_receive) { > > + num = (uint16_t)RTE_MIN(nb_receive, > VHOST_MAX_PKT_BURST); > > + nb_pkts = rte_vhost_dequeue_burst(r->vid, r- > >virtqueue_id, > > + r->mb_pool, &bufs[nb_rx], > > + num); > > + > > + nb_rx += nb_pkts; > > + nb_receive -= nb_pkts; > > + if (nb_pkts < num) > > + break; > > + } > > + } else { > > + int nr_inflight; > > + > > + while (nb_receive) { > > + num = (uint16_t)RTE_MIN(nb_receive, > VHOST_MAX_PKT_BURST); > > + nb_pkts = rte_vhost_async_try_dequeue_burst(r- > >vid, r->virtqueue_id, > > + r->mb_pool, &bufs[nb_rx], > num, &nr_inflight, > > + r->dma_id, 0); > > + > > + nb_rx += nb_pkts; > > + nb_receive -= nb_pkts; > > + if (nb_pkts < num) > > + break; > > + } > > } > > > > r->stats.pkts += nb_rx; > > @@ -445,6 +528,17 @@ eth_vhost_rx(void *q, struct rte_mbuf **bufs, > uint16_t nb_bufs) > > out: > > rte_atomic32_set(&r->while_queuing, 0); > > > > +tx_poll: > > + /** > > + * Poll and free completed packets for the virtqueue of Tx queue. > > + * Note that we access Tx queue's virtqueue, which is protected > > + * by vring lock. > > + */ > > + if (r->txq->async_register && !async_tx_poll_completed && > > + rte_atomic32_read(&r->txq->allow_queuing) == 1) > > + vhost_tx_free_completed(r->vid, r->txq->virtqueue_id, r- > >txq->dma_id, > > + r->cmpl_pkts, VHOST_MAX_PKT_BURST); > > + > > return nb_rx; > > } > > > > @@ -456,6 +550,7 @@ eth_vhost_tx(void *q, struct rte_mbuf **bufs, > uint16_t nb_bufs) > > uint16_t nb_send = 0; > > uint64_t nb_bytes = 0; > > uint64_t nb_missed = 0; > > + uint16_t nb_pkts, num; > > > > if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0)) > > return 0; > > @@ -486,31 +581,49 @@ eth_vhost_tx(void *q, struct rte_mbuf **bufs, > uint16_t nb_bufs) > > } > > > > /* Enqueue packets to guest RX queue */ > > - while (nb_send) { > > - uint16_t nb_pkts; > > - uint16_t num = (uint16_t)RTE_MIN(nb_send, > > - VHOST_MAX_PKT_BURST); > > + if (!r->async_register) { > > + while (nb_send) { > > + num = (uint16_t)RTE_MIN(nb_send, > VHOST_MAX_PKT_BURST); > > + nb_pkts = rte_vhost_enqueue_burst(r->vid, r- > >virtqueue_id, > > + &bufs[nb_tx], num); > > + > > + nb_tx += nb_pkts; > > + nb_send -= nb_pkts; > > + if (nb_pkts < num) > > + break; > > + } > > > > - nb_pkts = rte_vhost_enqueue_burst(r->vid, r->virtqueue_id, > > - &bufs[nb_tx], num); > > + for (i = 0; likely(i < nb_tx); i++) { > > + nb_bytes += bufs[i]->pkt_len; > > + rte_pktmbuf_free(bufs[i]); > > + } > > > > - nb_tx += nb_pkts; > > - nb_send -= nb_pkts; > > - if (nb_pkts < num) > > - break; > > - } > > + } else { > > + while (nb_send) { > > + num = (uint16_t)RTE_MIN(nb_send, > VHOST_MAX_PKT_BURST); > > + nb_pkts = rte_vhost_submit_enqueue_burst(r->vid, > r->virtqueue_id, > > + &bufs[nb_tx], num, r- > >dma_id, 0); > > + > > + nb_tx += nb_pkts; > > + nb_send -= nb_pkts; > > + if (nb_pkts < num) > > + break; > > + } > > > > - for (i = 0; likely(i < nb_tx); i++) > > - nb_bytes += bufs[i]->pkt_len; > > + for (i = 0; likely(i < nb_tx); i++) > > + nb_bytes += bufs[i]->pkt_len; > > > > - nb_missed = nb_bufs - nb_tx; > > + if (unlikely(async_tx_poll_completed)) { > > + vhost_tx_free_completed(r->vid, r->virtqueue_id, r- > >dma_id, r->cmpl_pkts, > > + VHOST_MAX_PKT_BURST); > > + } > > + } > > > > + nb_missed = nb_bufs - nb_tx; > > r->stats.pkts += nb_tx; > > r->stats.bytes += nb_bytes; > > r->stats.missed_pkts += nb_missed; > > > > - for (i = 0; likely(i < nb_tx); i++) > > - rte_pktmbuf_free(bufs[i]); > > out: > > rte_atomic32_set(&r->while_queuing, 0); > > > > @@ -798,6 +911,8 @@ queue_setup(struct rte_eth_dev *eth_dev, struct > pmd_internal *internal) > > vq->vid = internal->vid; > > vq->internal = internal; > > vq->port = eth_dev->data->port_id; > > + if (i < eth_dev->data->nb_tx_queues) > > + vq->txq = eth_dev->data->tx_queues[i]; > > } > > for (i = 0; i < eth_dev->data->nb_tx_queues; i++) { > > vq = eth_dev->data->tx_queues[i]; > > @@ -878,6 +993,30 @@ new_device(int vid) > > return 0; > > } > > > > +static inline void > > +async_clear_virtqueue(int vid, uint16_t virtqueue_id, int16_t dma_id) > > +{ > > + struct rte_mbuf *pkts[VHOST_MAX_PKT_BURST]; > > + uint16_t i, ret, nr_done = 0; > > + > > + if (vid == -1) > > + return; > > + > > + while (rte_vhost_async_get_inflight(vid, virtqueue_id) > 0) { > > + ret = rte_vhost_clear_queue(vid, virtqueue_id, pkts, > VHOST_MAX_PKT_BURST, dma_id, > > + 0); > > + for (i = 0; i < ret ; i++) > > + rte_pktmbuf_free(pkts[i]); > > + > > + nr_done += ret; > > + } > > + VHOST_LOG(INFO, "Completed %u pkts for vid-%u vring-%u\n", > nr_done, > > +vid, virtqueue_id); > > + > > + if (rte_vhost_async_channel_unregister(vid, virtqueue_id)) > > + VHOST_LOG(ERR, "Failed to unregister async for vid-%u > vring-%u\n", vid, > > + virtqueue_id); > > +} > > + > > static void > > destroy_device(int vid) > > { > > @@ -908,13 +1047,27 @@ destroy_device(int vid) > > vq = eth_dev->data->rx_queues[i]; > > if (!vq) > > continue; > > + if (vq->async_register) { > > + async_clear_virtqueue(vq->vid, vq- > >virtqueue_id, > > + vq->dma_id); > > + vq->async_register = false; > > + internal->queue_dmas[vq- > >virtqueue_id].async_register = false; > > + } > > vq->vid = -1; > > + > > } > > for (i = 0; i < eth_dev->data->nb_tx_queues; i++) { > > vq = eth_dev->data->tx_queues[i]; > > if (!vq) > > continue; > > + if (vq->async_register) { > > + async_clear_virtqueue(vq->vid, vq- > >virtqueue_id, > > + vq->dma_id); > > + vq->async_register = false; > > + internal->queue_dmas[vq- > >virtqueue_id].async_register = false; > > + } > > vq->vid = -1; > > + > > } > > } > > > > @@ -983,6 +1136,9 @@ vring_state_changed(int vid, uint16_t vring, int > enable) > > struct rte_vhost_vring_state *state; > > struct rte_eth_dev *eth_dev; > > struct internal_list *list; > > + struct vhost_queue *queue; > > + struct pmd_internal *internal; > > + int qid; > > char ifname[PATH_MAX]; > > > > rte_vhost_get_ifname(vid, ifname, sizeof(ifname)); @@ -1011,6 > > +1167,65 @@ vring_state_changed(int vid, uint16_t vring, int enable) > > > > update_queuing_status(eth_dev, false); > > > > + qid = vring / VIRTIO_QNUM; > > + if (vring % VIRTIO_QNUM == VIRTIO_RXQ) > > + queue = eth_dev->data->tx_queues[qid]; > > + else > > + queue = eth_dev->data->rx_queues[qid]; > > + > > + if (!queue) > > + goto skip; > > + > > + internal = eth_dev->data->dev_private; > > + > > + /* Register async data path for the queue assigned valid DMA device > */ > > + if (internal->queue_dmas[queue->virtqueue_id].dma_id == > INVALID_DMA_ID) > > + goto skip; > > + > > + if (enable && !queue->async_register) { > > + if (rte_vhost_async_channel_register_thread_unsafe(vid, > vring)) { > > + VHOST_LOG(ERR, "Failed to register async for vid-%u > vring-%u!\n", vid, > > + vring); > > + return -1; > > + } > > + > > + queue->async_register = true; > > + internal->queue_dmas[vring].async_register = true; > > + > > + VHOST_LOG(INFO, "Succeed to register async for vid-%u > vring-%u\n", vid, vring); > > + } > > + > > + if (!enable && queue->async_register) { > > + struct rte_mbuf *pkts[VHOST_MAX_PKT_BURST]; > > + uint16_t ret, i, nr_done = 0; > > + uint16_t dma_id = queue->dma_id; > > + > > + while (rte_vhost_async_get_inflight_thread_unsafe(vid, > vring) > 0) { > > + ret = rte_vhost_clear_queue_thread_unsafe(vid, > vring, pkts, > > + VHOST_MAX_PKT_BURST, dma_id, 0); > > + > > + for (i = 0; i < ret ; i++) > > + rte_pktmbuf_free(pkts[i]); > > + > > + nr_done += ret; > > + } > > + > > + VHOST_LOG(INFO, "Completed %u in-flight pkts for vid-%u > vring-%u\n", nr_done, vid, > > + vring); > > + > > + if (rte_vhost_async_channel_unregister_thread_unsafe(vid, > vring)) { > > + VHOST_LOG(ERR, "Failed to unregister async for vid- > %u vring-%u\n", vid, > > + vring); > > + return -1; > > + } > > + > > + queue->async_register = false; > > + internal->queue_dmas[vring].async_register = false; > > + > > + VHOST_LOG(INFO, "Succeed to unregister async for vid-%u > vring-%u\n", vid, vring); > > + } > > + > > +skip: > > VHOST_LOG(INFO, "vring%u is %s\n", > > vring, enable ? "enabled" : "disabled"); > > > > @@ -1159,6 +1374,12 @@ rte_eth_vhost_get_vid_from_port_id(uint16_t > port_id) > > return vid; > > } > > > > +void > > +rte_eth_vhost_async_tx_poll_completed(bool enable) { > > + async_tx_poll_completed = enable; > > +} > > + > > static int > > eth_dev_configure(struct rte_eth_dev *dev) > > { > > @@ -1220,6 +1441,7 @@ eth_dev_close(struct rte_eth_dev *dev) > > { > > struct pmd_internal *internal; > > struct internal_list *list; > > + struct vhost_queue *queue; > > unsigned int i, ret; > > > > if (rte_eal_process_type() != RTE_PROC_PRIMARY) @@ -1233,6 > +1455,25 > > @@ eth_dev_close(struct rte_eth_dev *dev) > > > > list = find_internal_resource(internal->iface_name); > > if (list) { > > + /* Make sure all in-flight packets are completed before > destroy vhost */ > > + if (dev->data->rx_queues) { > > + for (i = 0; i < dev->data->nb_rx_queues; i++) { > > + queue = dev->data->rx_queues[i]; > > + if (queue->async_register) > > + async_clear_virtqueue(queue->vid, > queue->virtqueue_id, > > + queue->dma_id); > > + } > > + } > > + > > + if (dev->data->tx_queues) { > > + for (i = 0; i < dev->data->nb_tx_queues; i++) { > > + queue = dev->data->tx_queues[i]; > > + if (queue->async_register) > > + async_clear_virtqueue(queue->vid, > queue->virtqueue_id, > > + queue->dma_id); > > + } > > + } > > + > > rte_vhost_driver_unregister(internal->iface_name); > > pthread_mutex_lock(&internal_list_lock); > > TAILQ_REMOVE(&internal_list, list, next); @@ -1267,6 > +1508,7 @@ > > eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t rx_queue_id, > > struct rte_mempool *mb_pool) > > { > > struct vhost_queue *vq; > > + struct pmd_internal *internal = dev->data->dev_private; > > > > vq = rte_zmalloc_socket(NULL, sizeof(struct vhost_queue), > > RTE_CACHE_LINE_SIZE, socket_id); > > @@ -1277,6 +1519,8 @@ eth_rx_queue_setup(struct rte_eth_dev *dev, > > uint16_t rx_queue_id, > > > > vq->mb_pool = mb_pool; > > vq->virtqueue_id = rx_queue_id * VIRTIO_QNUM + VIRTIO_TXQ; > > + vq->async_register = internal->queue_dmas[vq- > >virtqueue_id].async_register; > > + vq->dma_id = internal->queue_dmas[vq->virtqueue_id].dma_id; > > rte_spinlock_init(&vq->intr_lock); > > dev->data->rx_queues[rx_queue_id] = vq; > > > > @@ -1290,6 +1534,7 @@ eth_tx_queue_setup(struct rte_eth_dev *dev, > uint16_t tx_queue_id, > > const struct rte_eth_txconf *tx_conf __rte_unused) > > { > > struct vhost_queue *vq; > > + struct pmd_internal *internal = dev->data->dev_private; > > > > vq = rte_zmalloc_socket(NULL, sizeof(struct vhost_queue), > > RTE_CACHE_LINE_SIZE, socket_id); > > @@ -1299,6 +1544,8 @@ eth_tx_queue_setup(struct rte_eth_dev *dev, > uint16_t tx_queue_id, > > } > > > > vq->virtqueue_id = tx_queue_id * VIRTIO_QNUM + VIRTIO_RXQ; > > + vq->async_register = internal->queue_dmas[vq- > >virtqueue_id].async_register; > > + vq->dma_id = internal->queue_dmas[vq->virtqueue_id].dma_id; > > rte_spinlock_init(&vq->intr_lock); > > dev->data->tx_queues[tx_queue_id] = vq; > > > > @@ -1509,13 +1756,14 @@ static const struct eth_dev_ops ops = { > > static int > > eth_dev_vhost_create(struct rte_vdev_device *dev, char *iface_name, > > int16_t queues, const unsigned int numa_node, uint64_t flags, > > - uint64_t disable_flags) > > + uint64_t disable_flags, struct dma_input_info *dma_input) > > { > > const char *name = rte_vdev_device_name(dev); > > struct rte_eth_dev_data *data; > > struct pmd_internal *internal = NULL; > > struct rte_eth_dev *eth_dev = NULL; > > struct rte_ether_addr *eth_addr = NULL; > > + int i; > > > > VHOST_LOG(INFO, "Creating VHOST-USER backend on numa > socket %u\n", > > numa_node); > > @@ -1564,6 +1812,12 @@ eth_dev_vhost_create(struct rte_vdev_device > *dev, char *iface_name, > > eth_dev->rx_pkt_burst = eth_vhost_rx; > > eth_dev->tx_pkt_burst = eth_vhost_tx; > > > > + for (i = 0; i < RTE_MAX_QUEUES_PER_PORT * 2; i++) { > > + /* Invalid DMA ID indicates the queue does not want to > enable async data path */ > > + internal->queue_dmas[i].dma_id = dma_input->dmas[i]; > > + internal->queue_dmas[i].async_register = false; > > + } > > + > > rte_eth_dev_probing_finish(eth_dev); > > return 0; > > > > @@ -1603,6 +1857,155 @@ open_int(const char *key __rte_unused, const > char *value, void *extra_args) > > return 0; > > } > > > > +static int > > +init_dma(int16_t dma_id, uint16_t ring_size) { > > + struct rte_dma_info info; > > + struct rte_dma_conf dev_config = { .nb_vchans = 1 }; > > + struct rte_dma_vchan_conf qconf = { > > + .direction = RTE_DMA_DIR_MEM_TO_MEM, > > + }; > > + int ret = 0; > > + > > + if (dma_is_configured(dma_id)) > > + goto out; > > + > > + if (rte_dma_info_get(dma_id, &info) != 0) { > > + VHOST_LOG(ERR, "dma %u get info failed\n", dma_id); > > + ret = -1; > > + goto out; > > + } > > + > > + if (info.max_vchans < 1) { > > + VHOST_LOG(ERR, "No channels available on dma %d\n", > dma_id); > > + ret = -1; > > + goto out; > > + } > > + > > + if (rte_dma_configure(dma_id, &dev_config) != 0) { > > + VHOST_LOG(ERR, "dma %u configure failed\n", dma_id); > > + ret = -1; > > + goto out; > > + } > > + > > + rte_dma_info_get(dma_id, &info); > > + if (info.nb_vchans != 1) { > > + VHOST_LOG(ERR, "dma %u has no queues\n", dma_id); > > + ret = -1; > > + goto out; > > + } > > + > > + ring_size = RTE_MAX(ring_size, MINIMUM_DMA_RING_SIZE); > > + ring_size = RTE_MAX(ring_size, info.min_desc); > > + qconf.nb_desc = RTE_MIN(ring_size, info.max_desc); > > + if (rte_dma_vchan_setup(dma_id, 0, &qconf) != 0) { > > + VHOST_LOG(ERR, "dma %u queue setup failed\n", dma_id); > > + ret = -1; > > + goto out; > > + } > > + > > + if (rte_dma_start(dma_id) != 0) { > > + VHOST_LOG(ERR, "dma %u start failed\n", dma_id); > > + ret = -1; > > + goto out; > > + } > > + > > + configured_dmas[dma_count++] = dma_id; > > + > > +out: > > + return ret; > > +} > > + > > +static int > > +open_dma(const char *key __rte_unused, const char *value, void > > +*extra_args) { > > + struct dma_input_info *dma_input = extra_args; > > + char *input = strndup(value, strlen(value) + 1); > > + char *addrs = input; > > + char *ptrs[2]; > > + char *start, *end, *substr; > > + uint16_t qid, virtqueue_id; > > + int16_t dma_id; > > + int i, ret = 0; > > + > > + while (isblank(*addrs)) > > + addrs++; > > + if (*addrs == '\0') { > > + VHOST_LOG(ERR, "No input DMA addresses\n"); > > + ret = -1; > > + goto out; > > + } > > + > > + /* process DMA devices within bracket. */ > > + addrs++; > > + substr = strtok(addrs, ";]"); > > + if (!substr) { > > + VHOST_LOG(ERR, "No input DMA addresse\n"); > > + ret = -1; > > + goto out; > > + } > > + > > + do { > > + rte_strsplit(substr, strlen(substr), ptrs, 2, '@'); > > + > > + char *txq, *rxq; > > + bool is_txq; > > + > > + txq = strstr(ptrs[0], "txq"); > > + rxq = strstr(ptrs[0], "rxq"); > > + if (txq == NULL && rxq == NULL) { > > + VHOST_LOG(ERR, "Illegal queue\n"); > > + ret = -1; > > + goto out; > > + } else if (txq) { > > + is_txq = true; > > + start = txq; > > + } else { > > + is_txq = false; > > + start = rxq; > > + } > > + > > + start += 3; > > + qid = strtol(start, &end, 0); > > + if (end == start) { > > + VHOST_LOG(ERR, "No input queue ID\n"); > > + ret = -1; > > + goto out; > > + } > > + > > + virtqueue_id = is_txq ? qid * 2 + VIRTIO_RXQ : qid * 2 + > > +VIRTIO_TXQ; > > + > > + dma_id = rte_dma_get_dev_id_by_name(ptrs[1]); > > + if (dma_id < 0) { > > + VHOST_LOG(ERR, "Fail to find DMA device %s.\n", > ptrs[1]); > > + ret = -1; > > + goto out; > > + } > > + > > + ret = init_dma(dma_id, dma_input->dma_ring_size); > > + if (ret != 0) { > > + VHOST_LOG(ERR, "Fail to initialize DMA %u\n", > dma_id); > > + ret = -1; > > + break; > > + } > > + > > + dma_input->dmas[virtqueue_id] = dma_id; > > + > > + substr = strtok(NULL, ";]"); > > + } while (substr); > > + > > + for (i = 0; i < dma_count; i++) { > > + if (rte_vhost_async_dma_configure(configured_dmas[i], 0) < > 0) { > > + VHOST_LOG(ERR, "Fail to configure DMA %u to > vhost\n", configured_dmas[i]); > > + ret = -1; > > + } > > + } > > + > > +out: > > + free(input); > > + return ret; > > +} > > + > > static int > > rte_pmd_vhost_probe(struct rte_vdev_device *dev) > > { > > @@ -1621,6 +2024,10 @@ rte_pmd_vhost_probe(struct rte_vdev_device > *dev) > > int legacy_ol_flags = 0; > > struct rte_eth_dev *eth_dev; > > const char *name = rte_vdev_device_name(dev); > > + struct dma_input_info dma_input; > > + > > + memset(dma_input.dmas, INVALID_DMA_ID, > sizeof(dma_input.dmas)); > > + dma_input.dma_ring_size = DEFAULT_DMA_RING_SIZE; > > > > VHOST_LOG(INFO, "Initializing pmd_vhost for %s\n", name); > > > > @@ -1736,6 +2143,43 @@ rte_pmd_vhost_probe(struct rte_vdev_device > *dev) > > goto out_free; > > } > > > > + if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_RING_SIZE) == 1) { > > + if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_ARG) == 1) { > > + ret = rte_kvargs_process(kvlist, > ETH_VHOST_DMA_RING_SIZE, > > + &open_int, > &dma_input.dma_ring_size); > > + if (ret < 0) > > + goto out_free; > > + > > + if (!rte_is_power_of_2(dma_input.dma_ring_size)) { > > + dma_input.dma_ring_size = > rte_align32pow2(dma_input.dma_ring_size); > > + VHOST_LOG(INFO, "Convert dma_ring_size to > the power of two %u\n", > > + dma_input.dma_ring_size); > > + } > > + } else { > > + VHOST_LOG(WARNING, "%s is not specified, skip to > parse %s\n", > > + ETH_VHOST_DMA_ARG, > ETH_VHOST_DMA_RING_SIZE); > > + } > > + } > > + > > + if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_ARG) == 1) { > > + if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_RING_SIZE) == > 0) > > + VHOST_LOG(INFO, "Use default dma ring size %d\n", > > +DEFAULT_DMA_RING_SIZE); > > + > > + ret = rte_kvargs_process(kvlist, ETH_VHOST_DMA_ARG, > > + &open_dma, &dma_input); > > + if (ret < 0) { > > + VHOST_LOG(ERR, "Failed to parse %s\n", > ETH_VHOST_DMA_ARG); > > + goto out_free; > > + } > > + > > + flags |= RTE_VHOST_USER_ASYNC_COPY; > > + /** > > + * Don't support live migration when enable > > + * DMA acceleration. > > + */ > > + disable_flags |= (1ULL << VHOST_F_LOG_ALL); > > + } > > + > > if (legacy_ol_flags == 0) > > flags |= RTE_VHOST_USER_NET_COMPLIANT_OL_FLAGS; > > > > @@ -1743,7 +2187,7 @@ rte_pmd_vhost_probe(struct rte_vdev_device > *dev) > > dev->device.numa_node = rte_socket_id(); > > > > ret = eth_dev_vhost_create(dev, iface_name, queues, > > - dev->device.numa_node, flags, > disable_flags); > > + dev->device.numa_node, flags, disable_flags, > &dma_input); > > if (ret == -1) > > VHOST_LOG(ERR, "Failed to create %s\n", name); > > > > @@ -1787,4 +2231,6 @@ RTE_PMD_REGISTER_PARAM_STRING(net_vhost, > > "postcopy-support=<0|1> " > > "tso=<0|1> " > > "linear-buffer=<0|1> " > > - "ext-buffer=<0|1>"); > > + "ext-buffer=<0|1> " > > + "dma-ring-size=<int> " > > + "dmas=[txq0@dma_addr;rxq0@dma_addr]"); > > diff --git a/drivers/net/vhost/rte_eth_vhost.h > > b/drivers/net/vhost/rte_eth_vhost.h > > index 0e68b9f668..146c98803d 100644 > > --- a/drivers/net/vhost/rte_eth_vhost.h > > +++ b/drivers/net/vhost/rte_eth_vhost.h > > @@ -52,6 +52,21 @@ int rte_eth_vhost_get_queue_event(uint16_t port_id, > > */ > > int rte_eth_vhost_get_vid_from_port_id(uint16_t port_id); > > > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change, or be removed, without prior > > +notice > > + * > > + * By default, rte_vhost_poll_enqueue_completed() is called in Rx path. > > + * This function enables Tx path, rather than Rx path, to poll > > +completed > > + * packets for vhost async enqueue operations. Note that virtio may > > +never > > + * receive DMA completed packets if there are no more Tx operations. > > + * > > + * @param enable > > + * True indicates Tx path to call rte_vhost_poll_enqueue_completed(). > > + */ > > +__rte_experimental > > +void rte_eth_vhost_async_tx_poll_completed(bool enable); > > + > > #ifdef __cplusplus > > } > > #endif > > diff --git a/drivers/net/vhost/version.map > > b/drivers/net/vhost/version.map index e42c89f1eb..0a40441227 100644 > > --- a/drivers/net/vhost/version.map > > +++ b/drivers/net/vhost/version.map > > @@ -6,3 +6,10 @@ DPDK_23 { > > > > local: *; > > }; > > + > > +EXPERIMENTAL { > > + global: > > + > > + # added in 22.11 > > + rte_eth_vhost_async_tx_poll_completed; > > I'm not in favor of adding driver-specific API. > IMHO, you should just decide whether doing completion on Tx or Rx path > and stick with it. > > Doing completion on the Tx path not being reliable (i.e. if not other packets > are sent, last enqueued packets are not completed), I guess only doing > completion in Rx path is the right thing to do. It's good to know your thoughts. But if we remove the driver-specific API and put poll_enqueue_completed in Rx path, Testpmd txonly mode cannot work with DMA accelerated enqueue, as no one will call poll_enqueue_completed. Any thoughts? Thanks, Jiayu