Hi Cheng, Some comments are inline.
Thanks, Jiayu > -----Original Message----- > From: Jiang, Cheng1 <cheng1.ji...@intel.com> > Sent: Friday, December 25, 2020 4:07 PM > To: maxime.coque...@redhat.com; Xia, Chenbo <chenbo....@intel.com> > Cc: dev@dpdk.org; Hu, Jiayu <jiayu...@intel.com>; Yang, YvonneX > <yvonnex.y...@intel.com>; Jiang, Cheng1 <cheng1.ji...@intel.com> > Subject: [PATCH v4 2/2] examples/vhost: refactor vhost data path > > Change the vm2vm data path to batch enqueue for better performance. > Support latest async vhost API, refactor vhost async data path, > replase rte_atomicNN_xxx to atomic_XXX and clean some codes. Typo: replase -> replace > > Signed-off-by: Cheng Jiang <cheng1.ji...@intel.com> > --- > examples/vhost/main.c | 202 +++++++++++++++++++++++++++++++----------- > examples/vhost/main.h | 7 +- > 2 files changed, 154 insertions(+), 55 deletions(-) > > diff --git a/examples/vhost/main.c b/examples/vhost/main.c > index 8d8c3038b..3ea12a474 100644 > --- a/examples/vhost/main.c > +++ b/examples/vhost/main.c > @@ -179,9 +179,18 @@ struct mbuf_table { > struct rte_mbuf *m_table[MAX_PKT_BURST]; > }; > > +struct vhost_bufftable { > + uint32_t len; > + uint64_t pre_tsc; > + struct rte_mbuf *m_table[MAX_PKT_BURST]; > +}; > + > /* TX queue for each data core. */ > struct mbuf_table lcore_tx_queue[RTE_MAX_LCORE]; > > +/* TX queue for each vhost device. */ Every lcore maintains a TX buffer for every vhost device, which is to batch pkts to enqueue for higher performance. I suggest you to update the description of vhost_txbuff above, as it is not very clear. > +struct vhost_bufftable *vhost_txbuff[RTE_MAX_LCORE * > MAX_VHOST_DEVICE]; > + > #define MBUF_TABLE_DRAIN_TSC ((rte_get_tsc_hz() + US_PER_S - 1) \ > / US_PER_S * BURST_TX_DRAIN_US) > #define VLAN_HLEN 4 > @@ -804,39 +813,114 @@ unlink_vmdq(struct vhost_dev *vdev) > } > } > > +static inline void > +free_pkts(struct rte_mbuf **pkts, uint16_t n) > +{ > + while (n--) > + rte_pktmbuf_free(pkts[n]); > +} > + > static __rte_always_inline void > -virtio_xmit(struct vhost_dev *dst_vdev, struct vhost_dev *src_vdev, > +complete_async_pkts(struct vhost_dev *vdev) > +{ > + struct rte_mbuf *p_cpl[MAX_PKT_BURST]; > + uint16_t complete_count; > + > + complete_count = rte_vhost_poll_enqueue_completed(vdev->vid, > + VIRTIO_RXQ, p_cpl, > MAX_PKT_BURST); > + if (complete_count) { > + atomic_fetch_sub(&vdev->nr_async_pkts, complete_count); > + free_pkts(p_cpl, complete_count); > + } > +} > + > +static __rte_always_inline void > +sync_virtio_xmit(struct vhost_dev *dst_vdev, struct vhost_dev *src_vdev, > struct rte_mbuf *m) > { > uint16_t ret; > - struct rte_mbuf *m_cpl[1]; > > if (builtin_net_driver) { > ret = vs_enqueue_pkts(dst_vdev, VIRTIO_RXQ, &m, 1); > - } else if (async_vhost_driver) { > - ret = rte_vhost_submit_enqueue_burst(dst_vdev->vid, > VIRTIO_RXQ, > - &m, 1); > - > - if (likely(ret)) > - dst_vdev->nr_async_pkts++; > - > - while (likely(dst_vdev->nr_async_pkts)) { > - if (rte_vhost_poll_enqueue_completed(dst_vdev- > >vid, > - VIRTIO_RXQ, m_cpl, 1)) > - dst_vdev->nr_async_pkts--; > - } > } else { > ret = rte_vhost_enqueue_burst(dst_vdev->vid, VIRTIO_RXQ, > &m, 1); > } > > if (enable_stats) { > - rte_atomic64_inc(&dst_vdev->stats.rx_total_atomic); > - rte_atomic64_add(&dst_vdev->stats.rx_atomic, ret); > + atomic_fetch_add(&dst_vdev->stats.rx_total_atomic, 1); > + atomic_fetch_add(&dst_vdev->stats.rx_atomic, ret); > src_vdev->stats.tx_total++; > src_vdev->stats.tx += ret; > } > } > > +static __rte_always_inline void > +drain_vhost(struct vhost_dev *vdev) > +{ > + uint16_t ret; > + uint64_t queue_id = rte_lcore_id() * MAX_VHOST_DEVICE + vdev- > >vid; > + uint16_t nr_xmit = vhost_txbuff[queue_id]->len; > + struct rte_mbuf **m = vhost_txbuff[queue_id]->m_table; "queue_id" is not a very good name, as it's not the queue id of vhost device, but a buffer index which holds pkts to enqueue. > + > + if (builtin_net_driver) { > + ret = vs_enqueue_pkts(vdev, VIRTIO_RXQ, m, nr_xmit); > + } else if (async_vhost_driver) { > + uint32_t cpu_cpl_nr = 0; > + uint16_t enqueue_fail = 0; > + struct rte_mbuf *m_cpu_cpl[nr_xmit]; > + > + complete_async_pkts(vdev); > + ret = rte_vhost_submit_enqueue_burst(vdev->vid, > VIRTIO_RXQ, > + m, nr_xmit, m_cpu_cpl, &cpu_cpl_nr); > + atomic_fetch_add(&vdev->nr_async_pkts, ret - cpu_cpl_nr); > + > + if (cpu_cpl_nr) > + free_pkts(m_cpu_cpl, cpu_cpl_nr); > + > + enqueue_fail = nr_xmit - ret; > + if (enqueue_fail) > + free_pkts(&m[ret], nr_xmit - ret); > + } else { > + ret = rte_vhost_enqueue_burst(vdev->vid, VIRTIO_RXQ, > + m, nr_xmit); > + } > + > + if (enable_stats) { > + atomic_fetch_add(&vdev->stats.rx_total_atomic, nr_xmit); > + atomic_fetch_add(&vdev->stats.rx_atomic, ret); > + } > + > + if (!async_vhost_driver) > + free_pkts(m, nr_xmit); > +} > + > +static __rte_always_inline void > +drain_vhost_table(void) > +{ > + const uint16_t lcore_id = rte_lcore_id(); > + struct vhost_bufftable *vhost_txq; > + struct vhost_dev *vdev; > + uint64_t cur_tsc; > + > + TAILQ_FOREACH(vdev, &lcore_info[lcore_id].vdev_list, > lcore_vdev_entry) { A lcore may have pkts to enqueue for any vhost device, as it's decided by mac address. So drain_vhost_table() shouldn't just process vhost ports allocated to its lcore, but all vhost ports that have un-sent packets. > + if (!vdev->remove) { > + vhost_txq = vhost_txbuff[lcore_id * > MAX_VHOST_DEVICE > + + vdev->vid]; > + cur_tsc = rte_rdtsc(); > + > + if (unlikely(cur_tsc - vhost_txq->pre_tsc > + > MBUF_TABLE_DRAIN_TSC)) { > + RTE_LOG_DP(DEBUG, VHOST_DATA, > + "Vhost tX queue drained after "tX" -> "TX" > timeout with burst size %u\n", > + vhost_txq->len); > + drain_vhost(vdev); > + vhost_txq->len = 0; > + vhost_txq->pre_tsc = cur_tsc; > + } > + } > + } > +} > + > /* > * Check if the packet destination MAC address is for a local device. If so > then > put > * the packet on that devices RX queue. If not then return. > @@ -846,7 +930,8 @@ virtio_tx_local(struct vhost_dev *vdev, struct > rte_mbuf *m) > { > struct rte_ether_hdr *pkt_hdr; > struct vhost_dev *dst_vdev; > - > + struct vhost_bufftable *vhost_txq; > + const uint16_t lcore_id = rte_lcore_id(); Why use "const"? > pkt_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *); > > dst_vdev = find_vhost_dev(&pkt_hdr->d_addr); > @@ -869,7 +954,19 @@ virtio_tx_local(struct vhost_dev *vdev, struct > rte_mbuf *m) > return 0; > } > > - virtio_xmit(dst_vdev, vdev, m); > + vhost_txq = vhost_txbuff[lcore_id * MAX_VHOST_DEVICE + dst_vdev- > >vid]; > + vhost_txq->m_table[vhost_txq->len++] = m; > + > + if (enable_stats) { > + vdev->stats.tx_total++; > + vdev->stats.tx++; > + } > + > + if (unlikely(vhost_txq->len == MAX_PKT_BURST)) { > + drain_vhost(dst_vdev); > + vhost_txq->len = 0; > + vhost_txq->pre_tsc = rte_rdtsc(); > + } > return 0; > } > > @@ -940,13 +1037,6 @@ static void virtio_tx_offload(struct rte_mbuf *m) > tcp_hdr->cksum = get_psd_sum(l3_hdr, m->ol_flags); > } > > -static inline void > -free_pkts(struct rte_mbuf **pkts, uint16_t n) > -{ > - while (n--) > - rte_pktmbuf_free(pkts[n]); > -} > - > static __rte_always_inline void > do_drain_mbuf_table(struct mbuf_table *tx_q) > { > @@ -979,16 +1069,14 @@ virtio_tx_route(struct vhost_dev *vdev, struct > rte_mbuf *m, uint16_t vlan_tag) > > TAILQ_FOREACH(vdev2, &vhost_dev_list, global_vdev_entry) > { > if (vdev2 != vdev) > - virtio_xmit(vdev2, vdev, m); > + sync_virtio_xmit(vdev2, vdev, m); > } > goto queue2nic; > } > > /*check if destination is local VM*/ > - if ((vm2vm_mode == VM2VM_SOFTWARE) && (virtio_tx_local(vdev, > m) == 0)) { > - rte_pktmbuf_free(m); > + if ((vm2vm_mode == VM2VM_SOFTWARE) && (virtio_tx_local(vdev, > m) == 0)) > return; > - } > > if (unlikely(vm2vm_mode == VM2VM_HARDWARE)) { > if (unlikely(find_local_dest(vdev, m, &offset, > @@ -1073,19 +1161,6 @@ drain_mbuf_table(struct mbuf_table *tx_q) > } > } > > -static __rte_always_inline void > -complete_async_pkts(struct vhost_dev *vdev, uint16_t qid) > -{ > - struct rte_mbuf *p_cpl[MAX_PKT_BURST]; > - uint16_t complete_count; > - > - complete_count = rte_vhost_poll_enqueue_completed(vdev->vid, > - qid, p_cpl, MAX_PKT_BURST); > - vdev->nr_async_pkts -= complete_count; > - if (complete_count) > - free_pkts(p_cpl, complete_count); > -} > - > static __rte_always_inline void > drain_eth_rx(struct vhost_dev *vdev) > { > @@ -1095,9 +1170,6 @@ drain_eth_rx(struct vhost_dev *vdev) > rx_count = rte_eth_rx_burst(ports[0], vdev->vmdq_rx_q, > pkts, MAX_PKT_BURST); > > - while (likely(vdev->nr_async_pkts)) > - complete_async_pkts(vdev, VIRTIO_RXQ); > - > if (!rx_count) > return; > > @@ -1123,17 +1195,31 @@ drain_eth_rx(struct vhost_dev *vdev) > enqueue_count = vs_enqueue_pkts(vdev, VIRTIO_RXQ, > pkts, rx_count); > } else if (async_vhost_driver) { > + uint32_t cpu_cpl_nr = 0; > + uint16_t enqueue_fail = 0; > + struct rte_mbuf *m_cpu_cpl[MAX_PKT_BURST]; > + > + complete_async_pkts(vdev); > enqueue_count = rte_vhost_submit_enqueue_burst(vdev- > >vid, > - VIRTIO_RXQ, pkts, rx_count); > - vdev->nr_async_pkts += enqueue_count; > + VIRTIO_RXQ, pkts, rx_count, > + m_cpu_cpl, &cpu_cpl_nr); > + atomic_fetch_add(&vdev->nr_async_pkts, > + enqueue_count - cpu_cpl_nr); > + if (cpu_cpl_nr) > + free_pkts(m_cpu_cpl, cpu_cpl_nr); > + > + enqueue_fail = rx_count - enqueue_count; > + if (enqueue_fail) > + free_pkts(&pkts[enqueue_count], enqueue_fail); > + > } else { > enqueue_count = rte_vhost_enqueue_burst(vdev->vid, > VIRTIO_RXQ, > pkts, rx_count); > } > > if (enable_stats) { > - rte_atomic64_add(&vdev->stats.rx_total_atomic, rx_count); > - rte_atomic64_add(&vdev->stats.rx_atomic, enqueue_count); > + atomic_fetch_add(&vdev->stats.rx_total_atomic, rx_count); > + atomic_fetch_add(&vdev->stats.rx_atomic, enqueue_count); > } > > if (!async_vhost_driver) > @@ -1202,7 +1288,7 @@ switch_worker(void *arg __rte_unused) > > while(1) { > drain_mbuf_table(tx_q); > - > + drain_vhost_table(); > /* > * Inform the configuration core that we have exited the > * linked list and that no devices are in use if requested. > @@ -1298,6 +1384,7 @@ static int > new_device(int vid) > { > int lcore, core_add = 0; > + uint16_t i; > uint32_t device_num_min = num_devices; > struct vhost_dev *vdev; > vdev = rte_zmalloc("vhost device", sizeof(*vdev), > RTE_CACHE_LINE_SIZE); > @@ -1309,6 +1396,13 @@ new_device(int vid) > } > vdev->vid = vid; > > + for (i = 0; i < RTE_MAX_LCORE; i++) { > + vhost_txbuff[i * MAX_VHOST_DEVICE + vid] > + = rte_zmalloc("vhost bufftable", > + sizeof(struct vhost_bufftable), > + RTE_CACHE_LINE_SIZE); Rte_zmalloc() may fail. Need to handle failure. > + } > + > if (builtin_net_driver) > vs_vhost_net_setup(vdev); > > @@ -1343,12 +1437,15 @@ new_device(int vid) > if (async_vhost_driver) { > struct rte_vhost_async_features f; > struct rte_vhost_async_channel_ops channel_ops; > + > if (strncmp(dma_type, "ioat", 4) == 0) { > channel_ops.transfer_data = ioat_transfer_data_cb; > channel_ops.check_completed_copies = > ioat_check_completed_copies_cb; > + > f.async_inorder = 1; > f.async_threshold = 256; > + > return rte_vhost_async_channel_register(vid, > VIRTIO_RXQ, > f.intval, &channel_ops); > } > @@ -1392,8 +1489,8 @@ print_stats(__rte_unused void *arg) > tx = vdev->stats.tx; > tx_dropped = tx_total - tx; > > - rx_total = rte_atomic64_read(&vdev- > >stats.rx_total_atomic); > - rx = rte_atomic64_read(&vdev->stats.rx_atomic); > + rx_total = atomic_load(&vdev- > >stats.rx_total_atomic); > + rx = atomic_load(&vdev->stats.rx_atomic); > rx_dropped = rx_total - rx; > > printf("Statistics for device %d\n" > @@ -1592,6 +1689,7 @@ main(int argc, char *argv[]) > /* Register vhost user driver to handle vhost messages. */ > for (i = 0; i < nb_sockets; i++) { > char *file = socket_files + i * PATH_MAX; > + > if (async_vhost_driver) > flags = flags | RTE_VHOST_USER_ASYNC_COPY; > > diff --git a/examples/vhost/main.h b/examples/vhost/main.h > index 4317b6ae8..6aa798a3e 100644 > --- a/examples/vhost/main.h > +++ b/examples/vhost/main.h > @@ -8,6 +8,7 @@ > #include <sys/queue.h> > > #include <rte_ether.h> > +#include <stdatomic.h> > > /* Macros for printing using RTE_LOG */ > #define RTE_LOGTYPE_VHOST_CONFIG RTE_LOGTYPE_USER1 > @@ -21,8 +22,8 @@ enum {VIRTIO_RXQ, VIRTIO_TXQ, VIRTIO_QNUM}; > struct device_statistics { > uint64_t tx; > uint64_t tx_total; > - rte_atomic64_t rx_atomic; > - rte_atomic64_t rx_total_atomic; > + atomic_int_least64_t rx_atomic; > + atomic_int_least64_t rx_total_atomic; > }; > > struct vhost_queue { > @@ -51,7 +52,7 @@ struct vhost_dev { > uint64_t features; > size_t hdr_len; > uint16_t nr_vrings; > - uint16_t nr_async_pkts; > + atomic_int_least16_t nr_async_pkts; > struct rte_vhost_memory *mem; > struct device_statistics stats; > TAILQ_ENTRY(vhost_dev) global_vdev_entry; > -- > 2.29.2