Hi Marvin, Is it possible to vectorize the processing? Other comments inline: /Gavin > -----Original Message----- > From: dev <dev-boun...@dpdk.org> On Behalf Of Marvin Liu > Sent: Friday, September 20, 2019 12:37 AM > To: maxime.coque...@redhat.com; tiwei....@intel.com; > zhihong.w...@intel.com > Cc: dev@dpdk.org; Marvin Liu <yong....@intel.com> > Subject: [dpdk-dev] [PATCH v2 03/16] vhost: add burst enqueue function for > packed ring > > Burst enqueue function will first check whether descriptors are cache > aligned. It will also check prerequisites in the beginning. Burst > enqueue function not support chained mbufs, single packet enqueue > function will handle it. > > Signed-off-by: Marvin Liu <yong....@intel.com> > > diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h > index 5074226f0..67889c80a 100644 > --- a/lib/librte_vhost/vhost.h > +++ b/lib/librte_vhost/vhost.h > @@ -39,6 +39,9 @@ > > #define VHOST_LOG_CACHE_NR 32 > > +#define PACKED_DESCS_BURST (RTE_CACHE_LINE_SIZE / \ > + sizeof(struct vring_packed_desc)) > + > #ifdef SUPPORT_GCC_UNROLL_PRAGMA > #define PRAGMA_PARAM "GCC unroll 4" > #endif > @@ -57,6 +60,8 @@ > #define UNROLL_PRAGMA(param) do {} while(0); > #endif > > +#define PACKED_BURST_MASK (PACKED_DESCS_BURST - 1) > + > /** > * Structure contains buffer address, length and descriptor index > * from vring to do scatter RX. > diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c > index 2b5c47145..c664b27c5 100644 > --- a/lib/librte_vhost/virtio_net.c > +++ b/lib/librte_vhost/virtio_net.c > @@ -895,6 +895,84 @@ virtio_dev_rx_split(struct virtio_net *dev, struct > vhost_virtqueue *vq, > return pkt_idx; > } > > +static __rte_unused __rte_always_inline int I remember "__rte_always_inline" should start at the first and separate line, otherwise you will get a style issue. /Gavin > +virtio_dev_rx_burst_packed(struct virtio_net *dev, struct vhost_virtqueue > *vq, > + struct rte_mbuf **pkts) > +{ > + bool wrap_counter = vq->avail_wrap_counter; > + struct vring_packed_desc *descs = vq->desc_packed; > + uint16_t avail_idx = vq->last_avail_idx; > + > + uint64_t desc_addrs[PACKED_DESCS_BURST]; > + struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_DESCS_BURST]; > + uint32_t buf_offset = dev->vhost_hlen; > + uint64_t lens[PACKED_DESCS_BURST]; > + > + uint16_t i; > + > + if (unlikely(avail_idx & PACKED_BURST_MASK)) > + return -1; > + > + UNROLL_PRAGMA(PRAGMA_PARAM) > + for (i = 0; i < PACKED_DESCS_BURST; i++) { > + if (unlikely(pkts[i]->next != NULL)) > + return -1; > + if (unlikely(!desc_is_avail(&descs[avail_idx + i], > + wrap_counter))) > + return -1; > + } > + > + rte_smp_rmb(); > + > + UNROLL_PRAGMA(PRAGMA_PARAM) > + for (i = 0; i < PACKED_DESCS_BURST; i++) > + lens[i] = descs[avail_idx + i].len; Looks like the code is a strong candidate for vectorization. > + > + UNROLL_PRAGMA(PRAGMA_PARAM) > + for (i = 0; i < PACKED_DESCS_BURST; i++) { > + if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset))) > + return -1; > + } > + > + UNROLL_PRAGMA(PRAGMA_PARAM) > + for (i = 0; i < PACKED_DESCS_BURST; i++) > + desc_addrs[i] = vhost_iova_to_vva(dev, vq, > + descs[avail_idx + i].addr, > + &lens[i], > + VHOST_ACCESS_RW); > + UNROLL_PRAGMA(PRAGMA_PARAM) > + for (i = 0; i < PACKED_DESCS_BURST; i++) { > + if (unlikely(lens[i] != descs[avail_idx + i].len)) > + return -1; > + } > + > + UNROLL_PRAGMA(PRAGMA_PARAM) > + for (i = 0; i < PACKED_DESCS_BURST; i++) { > + rte_prefetch0((void *)(uintptr_t)desc_addrs[i]); > + hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)desc_addrs[i]; > + lens[i] = pkts[i]->pkt_len + dev->vhost_hlen; > + } > + > + UNROLL_PRAGMA(PRAGMA_PARAM) > + for (i = 0; i < PACKED_DESCS_BURST; i++) > + virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr); > + A store barrier here is missing, last_avail_idx may be observed before the above enqueue completion on weak memory order architectures. For x86, a compiler barrier is also required.
> + vq->last_avail_idx += PACKED_DESCS_BURST; > + if (vq->last_avail_idx >= vq->size) { > + vq->last_avail_idx -= vq->size; > + vq->avail_wrap_counter ^= 1; > + } > + > + UNROLL_PRAGMA(PRAGMA_PARAM) > + for (i = 0; i < PACKED_DESCS_BURST; i++) { > + rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset), > + rte_pktmbuf_mtod_offset(pkts[i], void *, 0), > + pkts[i]->pkt_len); > + } > + > + return 0; > +} > + > static __rte_unused int16_t > virtio_dev_rx_single_packed(struct virtio_net *dev, struct vhost_virtqueue > *vq, > struct rte_mbuf *pkt) > -- > 2.17.1