Vhost asynchronous data-path offloads packet copy from the CPU
to the DMA engine. As a result, large packet copy can be accelerated
by the DMA engine, and vhost can free CPU cycles for higher level
functions.

In this patch, we enable asynchronous data-path for vhostpmd.
Asynchronous data path is enabled per tx/rx queue, and users need
to specify the DMA device used by the tx/rx queue. Each tx/rx queue
only supports to use one DMA device, but one DMA device can be shared
among multiple tx/rx queues of different vhost PMD ports.

Two PMD parameters are added:
- dmas: specify the used DMA device for a tx/rx queue.
        (Default: no queues enable asynchronous data path)
- dma-ring-size: DMA ring size.
        (Default: 4096).

Here is an example:
--vdev 
'eth_vhost0,iface=./s0,dmas=[txq0@0000:00.01.0;rxq0@0000:00.01.1],dma-ring-size=4096'

Signed-off-by: Jiayu Hu <jiayu...@intel.com>
Signed-off-by: Yuan Wang <yuanx.w...@intel.com>
Signed-off-by: Wenwu Ma <wenwux...@intel.com>

---
v5:
- add error log of testpmd cmd

v4:
- add txq allow_queuing check on poll completed
- unregister dma channel in destroy_device
- add dma ring size minimum limit
- fix serveral code style and logging issues

v3:
- add the API to version.map

v2:
- add missing file
- hide async_tx_poll_completed
- change default DMA ring size to 4096

---
 drivers/net/vhost/meson.build     |   1 +
 drivers/net/vhost/rte_eth_vhost.c | 512 ++++++++++++++++++++++++++++--
 drivers/net/vhost/rte_eth_vhost.h |  15 +
 drivers/net/vhost/version.map     |   7 +
 drivers/net/vhost/vhost_testpmd.c |  67 ++++
 5 files changed, 569 insertions(+), 33 deletions(-)
 create mode 100644 drivers/net/vhost/vhost_testpmd.c

diff --git a/drivers/net/vhost/meson.build b/drivers/net/vhost/meson.build
index f481a3a4b8..22a0ab3a58 100644
--- a/drivers/net/vhost/meson.build
+++ b/drivers/net/vhost/meson.build
@@ -9,4 +9,5 @@ endif
 
 deps += 'vhost'
 sources = files('rte_eth_vhost.c')
+testpmd_sources = files('vhost_testpmd.c')
 headers = files('rte_eth_vhost.h')
diff --git a/drivers/net/vhost/rte_eth_vhost.c 
b/drivers/net/vhost/rte_eth_vhost.c
index b152279fac..80cd9c8d92 100644
--- a/drivers/net/vhost/rte_eth_vhost.c
+++ b/drivers/net/vhost/rte_eth_vhost.c
@@ -7,6 +7,7 @@
 #include <pthread.h>
 #include <stdbool.h>
 #include <sys/epoll.h>
+#include <ctype.h>
 
 #include <rte_mbuf.h>
 #include <ethdev_driver.h>
@@ -18,6 +19,8 @@
 #include <rte_kvargs.h>
 #include <rte_vhost.h>
 #include <rte_spinlock.h>
+#include <rte_vhost_async.h>
+#include <rte_dmadev.h>
 
 #include "rte_eth_vhost.h"
 
@@ -37,8 +40,15 @@ enum {VIRTIO_RXQ, VIRTIO_TXQ, VIRTIO_QNUM};
 #define ETH_VHOST_LINEAR_BUF           "linear-buffer"
 #define ETH_VHOST_EXT_BUF              "ext-buffer"
 #define ETH_VHOST_LEGACY_OL_FLAGS      "legacy-ol-flags"
+#define ETH_VHOST_DMA_ARG              "dmas"
+#define ETH_VHOST_DMA_RING_SIZE                "dma-ring-size"
 #define VHOST_MAX_PKT_BURST 32
 
+#define INVALID_DMA_ID         -1
+#define DEFAULT_DMA_RING_SIZE  4096
+/* Minimum package size is 64, dma ring size should be greater than 32 */
+#define MINIMUM_DMA_RING_SIZE  64
+
 static const char *valid_arguments[] = {
        ETH_VHOST_IFACE_ARG,
        ETH_VHOST_QUEUES_ARG,
@@ -49,6 +59,8 @@ static const char *valid_arguments[] = {
        ETH_VHOST_LINEAR_BUF,
        ETH_VHOST_EXT_BUF,
        ETH_VHOST_LEGACY_OL_FLAGS,
+       ETH_VHOST_DMA_ARG,
+       ETH_VHOST_DMA_RING_SIZE,
        NULL
 };
 
@@ -80,8 +92,39 @@ struct vhost_queue {
        struct vhost_stats stats;
        int intr_enable;
        rte_spinlock_t intr_lock;
+
+       /* Flag of enabling async data path */
+       bool async_register;
+       /* DMA device ID */
+       int16_t dma_id;
+       /**
+        * For a Rx queue, "txq" points to its peer Tx queue.
+        * For a Tx queue, "txq" is never used.
+        */
+       struct vhost_queue *txq;
+       /* Array to keep DMA completed packets */
+       struct rte_mbuf *cmpl_pkts[VHOST_MAX_PKT_BURST];
 };
 
+struct dma_input_info {
+       int16_t dmas[RTE_MAX_QUEUES_PER_PORT * 2];
+       uint16_t dma_ring_size;
+};
+
+static int16_t configured_dmas[RTE_DMADEV_DEFAULT_MAX];
+static int dma_count;
+
+/**
+ * By default, its Rx path to call rte_vhost_poll_enqueue_completed() for 
enqueue operations.
+ * However, Rx function is never been called in testpmd "txonly" mode, thus 
causing virtio
+ * cannot receive DMA completed packets. To make txonly mode work correctly, 
we provide a
+ * command in testpmd to call rte_vhost_poll_enqueue_completed() in Tx path.
+ *
+ * When set async_tx_poll_completed to true, Tx path calls 
rte_vhost_poll_enqueue_completed();
+ * otherwise, Rx path calls it.
+ */
+bool async_tx_poll_completed;
+
 struct pmd_internal {
        rte_atomic32_t dev_attached;
        char *iface_name;
@@ -94,6 +137,10 @@ struct pmd_internal {
        bool vlan_strip;
        bool rx_sw_csum;
        bool tx_sw_csum;
+       struct {
+               int16_t dma_id;
+               bool async_register;
+       } queue_dmas[RTE_MAX_QUEUES_PER_PORT * 2];
 };
 
 struct internal_list {
@@ -124,6 +171,17 @@ struct rte_vhost_vring_state {
 
 static struct rte_vhost_vring_state *vring_states[RTE_MAX_ETHPORTS];
 
+static bool
+dma_is_configured(int16_t dma_id)
+{
+       int i;
+
+       for (i = 0; i < dma_count; i++)
+               if (configured_dmas[i] == dma_id)
+                       return true;
+       return false;
+}
+
 static int
 vhost_dev_xstats_reset(struct rte_eth_dev *dev)
 {
@@ -396,15 +454,27 @@ vhost_dev_rx_sw_csum(struct rte_mbuf *mbuf)
        mbuf->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_GOOD;
 }
 
+static inline void
+vhost_tx_free_completed(int vid, uint16_t virtqueue_id, int16_t dma_id,
+               struct rte_mbuf **pkts, uint16_t count)
+{
+       uint16_t i, ret;
+
+       ret = rte_vhost_poll_enqueue_completed(vid, virtqueue_id, pkts, count, 
dma_id, 0);
+       for (i = 0; likely(i < ret); i++)
+               rte_pktmbuf_free(pkts[i]);
+}
+
 static uint16_t
 eth_vhost_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
 {
        struct vhost_queue *r = q;
        uint16_t i, nb_rx = 0;
        uint16_t nb_receive = nb_bufs;
+       uint16_t nb_pkts, num;
 
        if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
-               return 0;
+               goto tx_poll;
 
        rte_atomic32_set(&r->while_queuing, 1);
 
@@ -412,19 +482,32 @@ eth_vhost_rx(void *q, struct rte_mbuf **bufs, uint16_t 
nb_bufs)
                goto out;
 
        /* Dequeue packets from guest TX queue */
-       while (nb_receive) {
-               uint16_t nb_pkts;
-               uint16_t num = (uint16_t)RTE_MIN(nb_receive,
-                                                VHOST_MAX_PKT_BURST);
-
-               nb_pkts = rte_vhost_dequeue_burst(r->vid, r->virtqueue_id,
-                                                 r->mb_pool, &bufs[nb_rx],
-                                                 num);
-
-               nb_rx += nb_pkts;
-               nb_receive -= nb_pkts;
-               if (nb_pkts < num)
-                       break;
+       if (!r->async_register) {
+               while (nb_receive) {
+                       num = (uint16_t)RTE_MIN(nb_receive, 
VHOST_MAX_PKT_BURST);
+                       nb_pkts = rte_vhost_dequeue_burst(r->vid, 
r->virtqueue_id,
+                                               r->mb_pool, &bufs[nb_rx],
+                                               num);
+
+                       nb_rx += nb_pkts;
+                       nb_receive -= nb_pkts;
+                       if (nb_pkts < num)
+                               break;
+               }
+       } else {
+               int nr_inflight;
+
+               while (nb_receive) {
+                       num = (uint16_t)RTE_MIN(nb_receive, 
VHOST_MAX_PKT_BURST);
+                       nb_pkts = rte_vhost_async_try_dequeue_burst(r->vid, 
r->virtqueue_id,
+                                               r->mb_pool, &bufs[nb_rx], num, 
&nr_inflight,
+                                               r->dma_id, 0);
+
+                       nb_rx += nb_pkts;
+                       nb_receive -= nb_pkts;
+                       if (nb_pkts < num)
+                               break;
+               }
        }
 
        r->stats.pkts += nb_rx;
@@ -445,6 +528,17 @@ eth_vhost_rx(void *q, struct rte_mbuf **bufs, uint16_t 
nb_bufs)
 out:
        rte_atomic32_set(&r->while_queuing, 0);
 
+tx_poll:
+       /**
+        * Poll and free completed packets for the virtqueue of Tx queue.
+        * Note that we access Tx queue's virtqueue, which is protected
+        * by vring lock.
+        */
+       if (r->txq->async_register && !async_tx_poll_completed &&
+                       rte_atomic32_read(&r->txq->allow_queuing) == 1)
+               vhost_tx_free_completed(r->vid, r->txq->virtqueue_id, 
r->txq->dma_id,
+                               r->cmpl_pkts, VHOST_MAX_PKT_BURST);
+
        return nb_rx;
 }
 
@@ -456,6 +550,7 @@ eth_vhost_tx(void *q, struct rte_mbuf **bufs, uint16_t 
nb_bufs)
        uint16_t nb_send = 0;
        uint64_t nb_bytes = 0;
        uint64_t nb_missed = 0;
+       uint16_t nb_pkts, num;
 
        if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
                return 0;
@@ -486,31 +581,49 @@ eth_vhost_tx(void *q, struct rte_mbuf **bufs, uint16_t 
nb_bufs)
        }
 
        /* Enqueue packets to guest RX queue */
-       while (nb_send) {
-               uint16_t nb_pkts;
-               uint16_t num = (uint16_t)RTE_MIN(nb_send,
-                                                VHOST_MAX_PKT_BURST);
+       if (!r->async_register) {
+               while (nb_send) {
+                       num = (uint16_t)RTE_MIN(nb_send, VHOST_MAX_PKT_BURST);
+                       nb_pkts = rte_vhost_enqueue_burst(r->vid, 
r->virtqueue_id,
+                                               &bufs[nb_tx], num);
+
+                       nb_tx += nb_pkts;
+                       nb_send -= nb_pkts;
+                       if (nb_pkts < num)
+                               break;
+               }
 
-               nb_pkts = rte_vhost_enqueue_burst(r->vid, r->virtqueue_id,
-                                                 &bufs[nb_tx], num);
+               for (i = 0; likely(i < nb_tx); i++) {
+                       nb_bytes += bufs[i]->pkt_len;
+                       rte_pktmbuf_free(bufs[i]);
+               }
 
-               nb_tx += nb_pkts;
-               nb_send -= nb_pkts;
-               if (nb_pkts < num)
-                       break;
-       }
+       } else {
+               while (nb_send) {
+                       num = (uint16_t)RTE_MIN(nb_send, VHOST_MAX_PKT_BURST);
+                       nb_pkts = rte_vhost_submit_enqueue_burst(r->vid, 
r->virtqueue_id,
+                                                       &bufs[nb_tx], num, 
r->dma_id, 0);
+
+                       nb_tx += nb_pkts;
+                       nb_send -= nb_pkts;
+                       if (nb_pkts < num)
+                               break;
+               }
 
-       for (i = 0; likely(i < nb_tx); i++)
-               nb_bytes += bufs[i]->pkt_len;
+               for (i = 0; likely(i < nb_tx); i++)
+                       nb_bytes += bufs[i]->pkt_len;
 
-       nb_missed = nb_bufs - nb_tx;
+               if (unlikely(async_tx_poll_completed)) {
+                       vhost_tx_free_completed(r->vid, r->virtqueue_id, 
r->dma_id, r->cmpl_pkts,
+                                       VHOST_MAX_PKT_BURST);
+               }
+       }
 
+       nb_missed = nb_bufs - nb_tx;
        r->stats.pkts += nb_tx;
        r->stats.bytes += nb_bytes;
        r->stats.missed_pkts += nb_missed;
 
-       for (i = 0; likely(i < nb_tx); i++)
-               rte_pktmbuf_free(bufs[i]);
 out:
        rte_atomic32_set(&r->while_queuing, 0);
 
@@ -798,6 +911,8 @@ queue_setup(struct rte_eth_dev *eth_dev, struct 
pmd_internal *internal)
                vq->vid = internal->vid;
                vq->internal = internal;
                vq->port = eth_dev->data->port_id;
+               if (i < eth_dev->data->nb_tx_queues)
+                       vq->txq = eth_dev->data->tx_queues[i];
        }
        for (i = 0; i < eth_dev->data->nb_tx_queues; i++) {
                vq = eth_dev->data->tx_queues[i];
@@ -878,6 +993,30 @@ new_device(int vid)
        return 0;
 }
 
+static inline void
+async_clear_virtqueue(int vid, uint16_t virtqueue_id, int16_t dma_id)
+{
+       struct rte_mbuf *pkts[VHOST_MAX_PKT_BURST];
+       uint16_t i, ret, nr_done = 0;
+
+       if (vid == -1)
+               return;
+
+       while (rte_vhost_async_get_inflight(vid, virtqueue_id) > 0) {
+               ret = rte_vhost_clear_queue(vid, virtqueue_id, pkts, 
VHOST_MAX_PKT_BURST, dma_id,
+                               0);
+               for (i = 0; i < ret ; i++)
+                       rte_pktmbuf_free(pkts[i]);
+
+               nr_done += ret;
+       }
+       VHOST_LOG(INFO, "Completed %u pkts for vid-%u vring-%u\n", nr_done, 
vid, virtqueue_id);
+
+       if (rte_vhost_async_channel_unregister(vid, virtqueue_id))
+               VHOST_LOG(ERR, "Failed to unregister async for vid-%u 
vring-%u\n", vid,
+                               virtqueue_id);
+}
+
 static void
 destroy_device(int vid)
 {
@@ -908,13 +1047,27 @@ destroy_device(int vid)
                        vq = eth_dev->data->rx_queues[i];
                        if (!vq)
                                continue;
+                       if (vq->async_register) {
+                               async_clear_virtqueue(vq->vid, vq->virtqueue_id,
+                                                       vq->dma_id);
+                               vq->async_register = false;
+                               
internal->queue_dmas[vq->virtqueue_id].async_register = false;
+                       }
                        vq->vid = -1;
+
                }
                for (i = 0; i < eth_dev->data->nb_tx_queues; i++) {
                        vq = eth_dev->data->tx_queues[i];
                        if (!vq)
                                continue;
+                       if (vq->async_register) {
+                               async_clear_virtqueue(vq->vid, vq->virtqueue_id,
+                                                       vq->dma_id);
+                               vq->async_register = false;
+                               
internal->queue_dmas[vq->virtqueue_id].async_register = false;
+                       }
                        vq->vid = -1;
+
                }
        }
 
@@ -983,6 +1136,9 @@ vring_state_changed(int vid, uint16_t vring, int enable)
        struct rte_vhost_vring_state *state;
        struct rte_eth_dev *eth_dev;
        struct internal_list *list;
+       struct vhost_queue *queue;
+       struct pmd_internal *internal;
+       int qid;
        char ifname[PATH_MAX];
 
        rte_vhost_get_ifname(vid, ifname, sizeof(ifname));
@@ -1011,6 +1167,65 @@ vring_state_changed(int vid, uint16_t vring, int enable)
 
        update_queuing_status(eth_dev, false);
 
+       qid = vring / VIRTIO_QNUM;
+       if (vring % VIRTIO_QNUM == VIRTIO_RXQ)
+               queue = eth_dev->data->tx_queues[qid];
+       else
+               queue = eth_dev->data->rx_queues[qid];
+
+       if (!queue)
+               goto skip;
+
+       internal = eth_dev->data->dev_private;
+
+       /* Register async data path for the queue assigned valid DMA device */
+       if (internal->queue_dmas[queue->virtqueue_id].dma_id == INVALID_DMA_ID)
+               goto skip;
+
+       if (enable && !queue->async_register) {
+               if (rte_vhost_async_channel_register_thread_unsafe(vid, vring)) 
{
+                       VHOST_LOG(ERR, "Failed to register async for vid-%u 
vring-%u!\n", vid,
+                                       vring);
+                       return -1;
+               }
+
+               queue->async_register = true;
+               internal->queue_dmas[vring].async_register = true;
+
+               VHOST_LOG(INFO, "Succeed to register async for vid-%u 
vring-%u\n", vid, vring);
+       }
+
+       if (!enable && queue->async_register) {
+               struct rte_mbuf *pkts[VHOST_MAX_PKT_BURST];
+               uint16_t ret, i, nr_done = 0;
+               uint16_t dma_id = queue->dma_id;
+
+               while (rte_vhost_async_get_inflight_thread_unsafe(vid, vring) > 
0) {
+                       ret = rte_vhost_clear_queue_thread_unsafe(vid, vring, 
pkts,
+                                       VHOST_MAX_PKT_BURST, dma_id, 0);
+
+                       for (i = 0; i < ret ; i++)
+                               rte_pktmbuf_free(pkts[i]);
+
+                       nr_done += ret;
+               }
+
+               VHOST_LOG(INFO, "Completed %u in-flight pkts for vid-%u 
vring-%u\n", nr_done, vid,
+                               vring);
+
+               if (rte_vhost_async_channel_unregister_thread_unsafe(vid, 
vring)) {
+                       VHOST_LOG(ERR, "Failed to unregister async for vid-%u 
vring-%u\n", vid,
+                                       vring);
+                       return -1;
+               }
+
+               queue->async_register = false;
+               internal->queue_dmas[vring].async_register = false;
+
+               VHOST_LOG(INFO, "Succeed to unregister async for vid-%u 
vring-%u\n", vid, vring);
+       }
+
+skip:
        VHOST_LOG(INFO, "vring%u is %s\n",
                        vring, enable ? "enabled" : "disabled");
 
@@ -1159,6 +1374,12 @@ rte_eth_vhost_get_vid_from_port_id(uint16_t port_id)
        return vid;
 }
 
+void
+rte_eth_vhost_async_tx_poll_completed(bool enable)
+{
+       async_tx_poll_completed = enable;
+}
+
 static int
 eth_dev_configure(struct rte_eth_dev *dev)
 {
@@ -1220,6 +1441,7 @@ eth_dev_close(struct rte_eth_dev *dev)
 {
        struct pmd_internal *internal;
        struct internal_list *list;
+       struct vhost_queue *queue;
        unsigned int i, ret;
 
        if (rte_eal_process_type() != RTE_PROC_PRIMARY)
@@ -1233,6 +1455,25 @@ eth_dev_close(struct rte_eth_dev *dev)
 
        list = find_internal_resource(internal->iface_name);
        if (list) {
+               /* Make sure all in-flight packets are completed before destroy 
vhost */
+               if (dev->data->rx_queues) {
+                       for (i = 0; i < dev->data->nb_rx_queues; i++) {
+                               queue = dev->data->rx_queues[i];
+                               if (queue->async_register)
+                                       async_clear_virtqueue(queue->vid, 
queue->virtqueue_id,
+                                                       queue->dma_id);
+                       }
+               }
+
+               if (dev->data->tx_queues) {
+                       for (i = 0; i < dev->data->nb_tx_queues; i++) {
+                               queue = dev->data->tx_queues[i];
+                               if (queue->async_register)
+                                       async_clear_virtqueue(queue->vid, 
queue->virtqueue_id,
+                                                       queue->dma_id);
+                       }
+               }
+
                rte_vhost_driver_unregister(internal->iface_name);
                pthread_mutex_lock(&internal_list_lock);
                TAILQ_REMOVE(&internal_list, list, next);
@@ -1267,6 +1508,7 @@ eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t 
rx_queue_id,
                   struct rte_mempool *mb_pool)
 {
        struct vhost_queue *vq;
+       struct pmd_internal *internal = dev->data->dev_private;
 
        vq = rte_zmalloc_socket(NULL, sizeof(struct vhost_queue),
                        RTE_CACHE_LINE_SIZE, socket_id);
@@ -1277,6 +1519,8 @@ eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t 
rx_queue_id,
 
        vq->mb_pool = mb_pool;
        vq->virtqueue_id = rx_queue_id * VIRTIO_QNUM + VIRTIO_TXQ;
+       vq->async_register = 
internal->queue_dmas[vq->virtqueue_id].async_register;
+       vq->dma_id = internal->queue_dmas[vq->virtqueue_id].dma_id;
        rte_spinlock_init(&vq->intr_lock);
        dev->data->rx_queues[rx_queue_id] = vq;
 
@@ -1290,6 +1534,7 @@ eth_tx_queue_setup(struct rte_eth_dev *dev, uint16_t 
tx_queue_id,
                   const struct rte_eth_txconf *tx_conf __rte_unused)
 {
        struct vhost_queue *vq;
+       struct pmd_internal *internal = dev->data->dev_private;
 
        vq = rte_zmalloc_socket(NULL, sizeof(struct vhost_queue),
                        RTE_CACHE_LINE_SIZE, socket_id);
@@ -1299,6 +1544,8 @@ eth_tx_queue_setup(struct rte_eth_dev *dev, uint16_t 
tx_queue_id,
        }
 
        vq->virtqueue_id = tx_queue_id * VIRTIO_QNUM + VIRTIO_RXQ;
+       vq->async_register = 
internal->queue_dmas[vq->virtqueue_id].async_register;
+       vq->dma_id = internal->queue_dmas[vq->virtqueue_id].dma_id;
        rte_spinlock_init(&vq->intr_lock);
        dev->data->tx_queues[tx_queue_id] = vq;
 
@@ -1509,13 +1756,14 @@ static const struct eth_dev_ops ops = {
 static int
 eth_dev_vhost_create(struct rte_vdev_device *dev, char *iface_name,
        int16_t queues, const unsigned int numa_node, uint64_t flags,
-       uint64_t disable_flags)
+       uint64_t disable_flags, struct dma_input_info *dma_input)
 {
        const char *name = rte_vdev_device_name(dev);
        struct rte_eth_dev_data *data;
        struct pmd_internal *internal = NULL;
        struct rte_eth_dev *eth_dev = NULL;
        struct rte_ether_addr *eth_addr = NULL;
+       int i;
 
        VHOST_LOG(INFO, "Creating VHOST-USER backend on numa socket %u\n",
                numa_node);
@@ -1564,6 +1812,12 @@ eth_dev_vhost_create(struct rte_vdev_device *dev, char 
*iface_name,
        eth_dev->rx_pkt_burst = eth_vhost_rx;
        eth_dev->tx_pkt_burst = eth_vhost_tx;
 
+       for (i = 0; i < RTE_MAX_QUEUES_PER_PORT * 2; i++) {
+               /* Invalid DMA ID indicates the queue does not want to enable 
async data path */
+               internal->queue_dmas[i].dma_id = dma_input->dmas[i];
+               internal->queue_dmas[i].async_register = false;
+       }
+
        rte_eth_dev_probing_finish(eth_dev);
        return 0;
 
@@ -1603,6 +1857,155 @@ open_int(const char *key __rte_unused, const char 
*value, void *extra_args)
        return 0;
 }
 
+static int
+init_dma(int16_t dma_id, uint16_t ring_size)
+{
+       struct rte_dma_info info;
+       struct rte_dma_conf dev_config = { .nb_vchans = 1 };
+       struct rte_dma_vchan_conf qconf = {
+               .direction = RTE_DMA_DIR_MEM_TO_MEM,
+       };
+       int ret = 0;
+
+       if (dma_is_configured(dma_id))
+               goto out;
+
+       if (rte_dma_info_get(dma_id, &info) != 0) {
+               VHOST_LOG(ERR, "dma %u get info failed\n", dma_id);
+               ret = -1;
+               goto out;
+       }
+
+       if (info.max_vchans < 1) {
+               VHOST_LOG(ERR, "No channels available on dma %d\n", dma_id);
+               ret = -1;
+               goto out;
+       }
+
+       if (rte_dma_configure(dma_id, &dev_config) != 0) {
+               VHOST_LOG(ERR, "dma %u configure failed\n", dma_id);
+               ret = -1;
+               goto out;
+       }
+
+       rte_dma_info_get(dma_id, &info);
+       if (info.nb_vchans != 1) {
+               VHOST_LOG(ERR, "dma %u has no queues\n", dma_id);
+               ret = -1;
+               goto out;
+       }
+
+       ring_size = RTE_MAX(ring_size, MINIMUM_DMA_RING_SIZE);
+       ring_size = RTE_MAX(ring_size, info.min_desc);
+       qconf.nb_desc = RTE_MIN(ring_size, info.max_desc);
+       if (rte_dma_vchan_setup(dma_id, 0, &qconf) != 0) {
+               VHOST_LOG(ERR, "dma %u queue setup failed\n", dma_id);
+               ret = -1;
+               goto out;
+       }
+
+       if (rte_dma_start(dma_id) != 0) {
+               VHOST_LOG(ERR, "dma %u start failed\n", dma_id);
+               ret = -1;
+               goto out;
+       }
+
+       configured_dmas[dma_count++] = dma_id;
+
+out:
+       return ret;
+}
+
+static int
+open_dma(const char *key __rte_unused, const char *value, void *extra_args)
+{
+       struct dma_input_info *dma_input = extra_args;
+       char *input = strndup(value, strlen(value) + 1);
+       char *addrs = input;
+       char *ptrs[2];
+       char *start, *end, *substr;
+       uint16_t qid, virtqueue_id;
+       int16_t dma_id;
+       int i, ret = 0;
+
+       while (isblank(*addrs))
+               addrs++;
+       if (*addrs == '\0') {
+               VHOST_LOG(ERR, "No input DMA addresses\n");
+               ret = -1;
+               goto out;
+       }
+
+       /* process DMA devices within bracket. */
+       addrs++;
+       substr = strtok(addrs, ";]");
+       if (!substr) {
+               VHOST_LOG(ERR, "No input DMA addresse\n");
+               ret = -1;
+               goto out;
+       }
+
+       do {
+               rte_strsplit(substr, strlen(substr), ptrs, 2, '@');
+
+               char *txq, *rxq;
+               bool is_txq;
+
+               txq = strstr(ptrs[0], "txq");
+               rxq = strstr(ptrs[0], "rxq");
+               if (txq == NULL && rxq == NULL) {
+                       VHOST_LOG(ERR, "Illegal queue\n");
+                       ret = -1;
+                       goto out;
+               } else if (txq) {
+                       is_txq = true;
+                       start = txq;
+               } else {
+                       is_txq = false;
+                       start = rxq;
+               }
+
+               start += 3;
+               qid = strtol(start, &end, 0);
+               if (end == start) {
+                       VHOST_LOG(ERR, "No input queue ID\n");
+                       ret = -1;
+                       goto out;
+               }
+
+               virtqueue_id = is_txq ? qid * 2 + VIRTIO_RXQ : qid * 2 + 
VIRTIO_TXQ;
+
+               dma_id = rte_dma_get_dev_id_by_name(ptrs[1]);
+               if (dma_id < 0) {
+                       VHOST_LOG(ERR, "Fail to find DMA device %s.\n", 
ptrs[1]);
+                       ret = -1;
+                       goto out;
+               }
+
+               ret = init_dma(dma_id, dma_input->dma_ring_size);
+               if (ret != 0) {
+                       VHOST_LOG(ERR, "Fail to initialize DMA %u\n", dma_id);
+                       ret = -1;
+                       break;
+               }
+
+               dma_input->dmas[virtqueue_id] = dma_id;
+
+               substr = strtok(NULL, ";]");
+       } while (substr);
+
+       for (i = 0; i < dma_count; i++) {
+               if (rte_vhost_async_dma_configure(configured_dmas[i], 0) < 0) {
+                       VHOST_LOG(ERR, "Fail to configure DMA %u to vhost\n", 
configured_dmas[i]);
+                       ret = -1;
+               }
+       }
+
+out:
+       free(input);
+       return ret;
+}
+
 static int
 rte_pmd_vhost_probe(struct rte_vdev_device *dev)
 {
@@ -1621,6 +2024,10 @@ rte_pmd_vhost_probe(struct rte_vdev_device *dev)
        int legacy_ol_flags = 0;
        struct rte_eth_dev *eth_dev;
        const char *name = rte_vdev_device_name(dev);
+       struct dma_input_info dma_input;
+
+       memset(dma_input.dmas, INVALID_DMA_ID, sizeof(dma_input.dmas));
+       dma_input.dma_ring_size = DEFAULT_DMA_RING_SIZE;
 
        VHOST_LOG(INFO, "Initializing pmd_vhost for %s\n", name);
 
@@ -1736,6 +2143,43 @@ rte_pmd_vhost_probe(struct rte_vdev_device *dev)
                        goto out_free;
        }
 
+       if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_RING_SIZE) == 1) {
+               if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_ARG) == 1) {
+                       ret = rte_kvargs_process(kvlist, 
ETH_VHOST_DMA_RING_SIZE,
+                                       &open_int, &dma_input.dma_ring_size);
+                       if (ret < 0)
+                               goto out_free;
+
+                       if (!rte_is_power_of_2(dma_input.dma_ring_size)) {
+                               dma_input.dma_ring_size = 
rte_align32pow2(dma_input.dma_ring_size);
+                               VHOST_LOG(INFO, "Convert dma_ring_size to the 
power of two %u\n",
+                                               dma_input.dma_ring_size);
+                       }
+               } else {
+                       VHOST_LOG(WARNING, "%s is not specified, skip to parse 
%s\n",
+                                       ETH_VHOST_DMA_ARG, 
ETH_VHOST_DMA_RING_SIZE);
+               }
+       }
+
+       if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_ARG) == 1) {
+               if (rte_kvargs_count(kvlist, ETH_VHOST_DMA_RING_SIZE) == 0)
+                       VHOST_LOG(INFO, "Use default dma ring size %d\n", 
DEFAULT_DMA_RING_SIZE);
+
+               ret = rte_kvargs_process(kvlist, ETH_VHOST_DMA_ARG,
+                                        &open_dma, &dma_input);
+               if (ret < 0) {
+                       VHOST_LOG(ERR, "Failed to parse %s\n", 
ETH_VHOST_DMA_ARG);
+                       goto out_free;
+               }
+
+               flags |= RTE_VHOST_USER_ASYNC_COPY;
+               /**
+                * Don't support live migration when enable
+                * DMA acceleration.
+                */
+               disable_flags |= (1ULL << VHOST_F_LOG_ALL);
+       }
+
        if (legacy_ol_flags == 0)
                flags |= RTE_VHOST_USER_NET_COMPLIANT_OL_FLAGS;
 
@@ -1743,7 +2187,7 @@ rte_pmd_vhost_probe(struct rte_vdev_device *dev)
                dev->device.numa_node = rte_socket_id();
 
        ret = eth_dev_vhost_create(dev, iface_name, queues,
-                                  dev->device.numa_node, flags, disable_flags);
+                       dev->device.numa_node, flags, disable_flags, 
&dma_input);
        if (ret == -1)
                VHOST_LOG(ERR, "Failed to create %s\n", name);
 
@@ -1787,4 +2231,6 @@ RTE_PMD_REGISTER_PARAM_STRING(net_vhost,
        "postcopy-support=<0|1> "
        "tso=<0|1> "
        "linear-buffer=<0|1> "
-       "ext-buffer=<0|1>");
+       "ext-buffer=<0|1> "
+       "dma-ring-size=<int> "
+       "dmas=[txq0@dma_addr;rxq0@dma_addr]");
diff --git a/drivers/net/vhost/rte_eth_vhost.h 
b/drivers/net/vhost/rte_eth_vhost.h
index 0e68b9f668..146c98803d 100644
--- a/drivers/net/vhost/rte_eth_vhost.h
+++ b/drivers/net/vhost/rte_eth_vhost.h
@@ -52,6 +52,21 @@ int rte_eth_vhost_get_queue_event(uint16_t port_id,
  */
 int rte_eth_vhost_get_vid_from_port_id(uint16_t port_id);
 
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change, or be removed, without prior notice
+ *
+ * By default, rte_vhost_poll_enqueue_completed() is called in Rx path.
+ * This function enables Tx path, rather than Rx path, to poll completed
+ * packets for vhost async enqueue operations. Note that virtio may never
+ * receive DMA completed packets if there are no more Tx operations.
+ *
+ * @param enable
+ *  True indicates Tx path to call rte_vhost_poll_enqueue_completed().
+ */
+__rte_experimental
+void rte_eth_vhost_async_tx_poll_completed(bool enable);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/drivers/net/vhost/version.map b/drivers/net/vhost/version.map
index e42c89f1eb..0a40441227 100644
--- a/drivers/net/vhost/version.map
+++ b/drivers/net/vhost/version.map
@@ -6,3 +6,10 @@ DPDK_23 {
 
        local: *;
 };
+
+EXPERIMENTAL {
+       global:
+
+       # added in 22.11
+       rte_eth_vhost_async_tx_poll_completed;
+};
diff --git a/drivers/net/vhost/vhost_testpmd.c 
b/drivers/net/vhost/vhost_testpmd.c
new file mode 100644
index 0000000000..11990ea152
--- /dev/null
+++ b/drivers/net/vhost/vhost_testpmd.c
@@ -0,0 +1,67 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2022 Intel Corporation.
+ */
+#include <rte_eth_vhost.h>
+#include <cmdline_parse_num.h>
+#include <cmdline_parse_string.h>
+
+#include "testpmd.h"
+
+struct cmd_tx_poll_result {
+       cmdline_fixed_string_t async_vhost;
+       cmdline_fixed_string_t tx;
+       cmdline_fixed_string_t poll;
+       cmdline_fixed_string_t completed;
+       cmdline_fixed_string_t what;
+};
+
+static cmdline_parse_token_string_t cmd_tx_async_vhost =
+       TOKEN_STRING_INITIALIZER(struct cmd_tx_poll_result, async_vhost, 
"async_vhost");
+static cmdline_parse_token_string_t cmd_tx_tx =
+       TOKEN_STRING_INITIALIZER(struct cmd_tx_poll_result, tx, "tx");
+static cmdline_parse_token_string_t cmd_tx_poll =
+       TOKEN_STRING_INITIALIZER(struct cmd_tx_poll_result, poll, "poll");
+static cmdline_parse_token_string_t cmd_tx_completed =
+       TOKEN_STRING_INITIALIZER(struct cmd_tx_poll_result, completed, 
"completed");
+static cmdline_parse_token_string_t cmd_tx_what =
+       TOKEN_STRING_INITIALIZER(struct cmd_tx_poll_result, what, "on#off");
+
+static void
+cmd_tx_poll_parsed(void *parsed_result, __rte_unused struct cmdline *cl, 
__rte_unused void *data)
+{
+       struct cmd_tx_poll_result *res = parsed_result;
+
+       if (!strcmp(res->what, "on"))
+               rte_eth_vhost_async_tx_poll_completed(true);
+       else if (!strcmp(res->what, "off"))
+               rte_eth_vhost_async_tx_poll_completed(false);
+       else
+               fprintf(stderr, "Unknown parameter\n");
+}
+
+static cmdline_parse_inst_t async_vhost_cmd_tx_poll = {
+       .f = cmd_tx_poll_parsed,
+       .data = NULL,
+       .help_str = "async-vhost tx poll completed on|off",
+       .tokens = {
+               (void *)&cmd_tx_async_vhost,
+               (void *)&cmd_tx_tx,
+               (void *)&cmd_tx_poll,
+               (void *)&cmd_tx_completed,
+               (void *)&cmd_tx_what,
+               NULL,
+       },
+};
+
+static struct testpmd_driver_commands async_vhost_cmds = {
+       .commands = {
+       {
+               &async_vhost_cmd_tx_poll,
+               "async_vhost tx poll completed (on|off)\n"
+               "    Poll and free DMA completed packets in Tx path.\n",
+       },
+       { NULL, NULL },
+       },
+};
+
+TESTPMD_ADD_DRIVER_COMMANDS(async_vhost_cmds)
-- 
2.25.1

Reply via email to