> -----Original Message----- > From: Mattias Rönnblom [mailto:hof...@lysator.liu.se] > Sent: Thursday, September 26, 2019 3:31 AM > To: Liu, Yong <yong....@intel.com>; maxime.coque...@redhat.com; Bie, Tiwei > <tiwei....@intel.com>; Wang, Zhihong <zhihong.w...@intel.com>; > step...@networkplumber.org; gavin...@arm.com > Cc: dev@dpdk.org > Subject: Re: [dpdk-dev] [PATCH v3 03/15] vhost: add batch enqueue function > for packed ring > > On 2019-09-25 19:13, Marvin Liu wrote: > > Batch enqueue function will first check whether descriptors are cache > > aligned. It will also check prerequisites in the beginning. Batch > > enqueue function not support chained mbufs, single packet enqueue > > function will handle it. > > > > Signed-off-by: Marvin Liu <yong....@intel.com> > > > > diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h > > index 4cba8c5ef..e241436c7 100644 > > --- a/lib/librte_vhost/vhost.h > > +++ b/lib/librte_vhost/vhost.h > > @@ -39,6 +39,10 @@ > > > > #define VHOST_LOG_CACHE_NR 32 > > > > +#define PACKED_BATCH_SIZE (RTE_CACHE_LINE_SIZE / \ > > + sizeof(struct vring_packed_desc)) > > +#define PACKED_BATCH_MASK (PACKED_BATCH_SIZE - 1) > > + > > #ifdef SUPPORT_GCC_UNROLL_PRAGMA > > #define UNROLL_PRAGMA_PARAM "GCC unroll 4" > > #endif > > diff --git a/lib/librte_vhost/virtio_net.c > b/lib/librte_vhost/virtio_net.c > > index 520c4c6a8..5e08f7d9b 100644 > > --- a/lib/librte_vhost/virtio_net.c > > +++ b/lib/librte_vhost/virtio_net.c > > @@ -883,6 +883,86 @@ virtio_dev_rx_split(struct virtio_net *dev, struct > vhost_virtqueue *vq, > > return pkt_idx; > > } > > > > +static __rte_unused int > > +virtio_dev_rx_batch_packed(struct virtio_net *dev, struct > vhost_virtqueue *vq, > > + struct rte_mbuf **pkts) > > +{ > > + bool wrap_counter = vq->avail_wrap_counter; > > + struct vring_packed_desc *descs = vq->desc_packed; > > + uint16_t avail_idx = vq->last_avail_idx; > > + uint64_t desc_addrs[PACKED_BATCH_SIZE]; > > + struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_BATCH_SIZE]; > > + uint32_t buf_offset = dev->vhost_hlen; > > + uint64_t lens[PACKED_BATCH_SIZE]; > > + uint16_t i; > > + > > + if (unlikely(avail_idx & PACKED_BATCH_MASK)) > > + return -1; > > Does this really generate better code than just "avail_idx < > PACKED_BATCH_SIZE"? and+jne vs cmp+jbe.
Hi Mattias, This comparison is to check whether descriptor location is cache aligned. In x86 cache line size is 64 bytes, so here mask is 0x3. This check will be and + test + je which is very simple. Most of times the cost of execution will be eliminated as the result can be predicted. Thanks, Marvin > > > + > > + if (unlikely((avail_idx + PACKED_BATCH_SIZE) > vq->size)) > > + return -1; > > + > > + UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM) > > + for (i = 0; i < PACKED_BATCH_SIZE; i++) { > > + if (unlikely(pkts[i]->next != NULL)) > > + return -1; > > + if (unlikely(!desc_is_avail(&descs[avail_idx + i], > > + wrap_counter))) > > + return -1; > > + } > > + > > + rte_smp_rmb(); > > + > > + UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM) > > + for (i = 0; i < PACKED_BATCH_SIZE; i++) > > + lens[i] = descs[avail_idx + i].len; > > + > > + UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM) > > + for (i = 0; i < PACKED_BATCH_SIZE; i++) { > > + if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset))) > > + return -1; > > + } > > + > > + UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM) > > + for (i = 0; i < PACKED_BATCH_SIZE; i++) > > + desc_addrs[i] = vhost_iova_to_vva(dev, vq, > > + descs[avail_idx + i].addr, > > + &lens[i], > > + VHOST_ACCESS_RW); > > + UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM) > > + for (i = 0; i < PACKED_BATCH_SIZE; i++) { > > + if (unlikely(lens[i] != descs[avail_idx + i].len)) > > + return -1; > > + } > > + > > + UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM) > > + for (i = 0; i < PACKED_BATCH_SIZE; i++) { > > + rte_prefetch0((void *)(uintptr_t)desc_addrs[i]); > > + hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *) > > + (uintptr_t)desc_addrs[i]; > > + lens[i] = pkts[i]->pkt_len + dev->vhost_hlen; > > + } > > + > > + UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM) > > + for (i = 0; i < PACKED_BATCH_SIZE; i++) > > + virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr); > > + > > + vq->last_avail_idx += PACKED_BATCH_SIZE; > > + if (vq->last_avail_idx >= vq->size) { > > + vq->last_avail_idx -= vq->size; > > + vq->avail_wrap_counter ^= 1; > > + } > > + > > + UNROLL_PRAGMA(UNROLL_PRAGMA_PARAM) > > + for (i = 0; i < PACKED_BATCH_SIZE; i++) { > > + rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset), > > + rte_pktmbuf_mtod_offset(pkts[i], void *, 0), > > + pkts[i]->pkt_len); > > + } > > + > > + return 0; > > +} > > + > > static __rte_unused int16_t > > virtio_dev_rx_single_packed(struct virtio_net *dev, struct > vhost_virtqueue *vq, > > struct rte_mbuf *pkt) > >