add the functions to support transmitting packets.

Signed-off-by: Zhiyong Yang <zhiyong.y...@intel.com>
---
 drivers/net/vhostpci/vhostpci_ethdev.c | 352 +++++++++++++++++++++++++++++++++
 1 file changed, 352 insertions(+)

diff --git a/drivers/net/vhostpci/vhostpci_ethdev.c 
b/drivers/net/vhostpci/vhostpci_ethdev.c
index 06e3f5c50..f233d85a8 100644
--- a/drivers/net/vhostpci/vhostpci_ethdev.c
+++ b/drivers/net/vhostpci/vhostpci_ethdev.c
@@ -53,6 +53,12 @@
 #define VHOSTPCI_MAX_PKT_BURST 32
 #define VHOSTPCI_BUF_VECTOR_MAX 256
 
+/* avoid write operation when necessary, to lessen cache issues */
+#define ASSIGN_UNLESS_EQUAL(var, val) do {     \
+       if ((var) != (val))                     \
+               (var) = (val);                  \
+} while (0)
+
 static void
 vhostpci_dev_info_get(struct rte_eth_dev *dev,
                struct rte_eth_dev_info *dev_info);
@@ -100,6 +106,10 @@ static uint16_t
 vhostpci_dequeue_burst(struct vhostpci_net *dev, uint16_t queue_id,
        struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count);
 
+static uint16_t
+vhostpci_dequeue_burst(struct vhostpci_net *dev, uint16_t queue_id,
+       struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count);
+
 static int
 vhostpci_dev_start(struct rte_eth_dev *dev);
 
@@ -321,6 +331,346 @@ vhostpci_dev_tx_queue_setup(struct rte_eth_dev *dev, 
uint16_t tx_queue_id,
        return 0;
 }
 
+
+static inline void
+do_data_copy_enqueue(struct vhostpci_virtqueue *vq)
+{
+       struct batch_copy_elem *elem = vq->batch_copy_elems;
+       uint16_t count = vq->batch_copy_nb_elems;
+       int i;
+
+       for (i = 0; i < count; i++)
+               rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
+}
+
+static __rte_always_inline int
+copy_mbuf_to_desc_mergeable(struct vhostpci_net *dev,
+               struct vhostpci_virtqueue *vq, struct rte_mbuf *m,
+               struct buf_vector *buf_vec, uint16_t num_buffers)
+{
+       uint32_t vec_idx = 0;
+       uint64_t desc_addr;
+       uint32_t mbuf_offset, mbuf_avail;
+       uint32_t desc_offset, desc_avail;
+       uint32_t cpy_len;
+       uint64_t hdr_addr;
+       struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
+       uint16_t copy_nb = vq->batch_copy_nb_elems;
+       int error = 0;
+
+       if (unlikely(m == NULL)) {
+               error = -1;
+               goto out;
+       }
+
+       desc_addr = remote_gpa_to_vva(dev, buf_vec[vec_idx].buf_addr);
+
+       if (buf_vec[vec_idx].buf_len < dev->vhost_hlen || !desc_addr) {
+               error = -1;
+               goto out;
+       }
+
+       hdr_addr = desc_addr;
+       rte_prefetch0((void *)(uintptr_t)hdr_addr);
+
+       desc_avail  = buf_vec[vec_idx].buf_len - dev->vhost_hlen;
+       desc_offset = dev->vhost_hlen;
+
+       mbuf_avail  = rte_pktmbuf_data_len(m);
+       mbuf_offset = 0;
+       while (mbuf_avail != 0 || m->next != NULL) {
+               /* done with current desc buf, get the next one */
+               if (desc_avail == 0) {
+                       vec_idx++;
+                       desc_addr = remote_gpa_to_vva(dev,
+                                       buf_vec[vec_idx].buf_addr);
+
+                       if (unlikely(!desc_addr)) {
+                               error = -1;
+                               goto out;
+                       }
+
+                       /* Prefetch buffer address. */
+                       rte_prefetch0((void *)(uintptr_t)desc_addr);
+                       desc_offset = 0;
+                       desc_avail  = buf_vec[vec_idx].buf_len;
+               }
+
+               /* done with current mbuf, get the next one */
+               if (mbuf_avail == 0) {
+                       m = m->next;
+                       mbuf_offset = 0;
+                       mbuf_avail  = rte_pktmbuf_data_len(m);
+               }
+
+               if (hdr_addr) {
+                       struct virtio_net_hdr_mrg_rxbuf *hdr;
+
+                       hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)
+                               hdr_addr;
+                       ASSIGN_UNLESS_EQUAL(hdr->num_buffers, num_buffers);
+                       hdr_addr = 0;
+               }
+
+               cpy_len = RTE_MIN(desc_avail, mbuf_avail);
+
+               if (likely(cpy_len > MAX_BATCH_LEN || copy_nb >= vq->size)) {
+                       rte_memcpy((void *)((uintptr_t)(desc_addr +
+                                                       desc_offset)),
+                               rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
+                               cpy_len);
+
+               } else {
+                       batch_copy[copy_nb].dst =
+                               (void *)((uintptr_t)(desc_addr + desc_offset));
+                       batch_copy[copy_nb].src =
+                               rte_pktmbuf_mtod_offset(m, void *, mbuf_offset);
+                       batch_copy[copy_nb].len = cpy_len;
+                       copy_nb++;
+               }
+
+               mbuf_avail  -= cpy_len;
+               mbuf_offset += cpy_len;
+               desc_avail  -= cpy_len;
+               desc_offset += cpy_len;
+       }
+
+out:
+       vq->batch_copy_nb_elems = copy_nb;
+
+       return error;
+}
+
+static __rte_always_inline int
+fill_vec_buf(struct vhostpci_virtqueue *vq,
+               uint32_t avail_idx, uint32_t *vec_idx,
+               struct buf_vector *buf_vec, uint16_t *desc_chain_head,
+               uint16_t *desc_chain_len)
+{
+       uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
+       uint32_t vec_id = *vec_idx;
+       uint32_t len = 0;
+       struct vring_desc *descs = vq->desc;
+
+       *desc_chain_head = idx;
+
+       while (1) {
+               if (unlikely(vec_id >= VHOSTPCI_BUF_VECTOR_MAX ||
+                       idx >= vq->size))
+                       return -1;
+
+               len += descs[idx].len;
+               buf_vec[vec_id].buf_addr = descs[idx].addr;
+               buf_vec[vec_id].buf_len  = descs[idx].len;
+               buf_vec[vec_id].desc_idx = idx;
+               vec_id++;
+
+               if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
+                       break;
+
+               idx = descs[idx].next;
+       }
+
+       *desc_chain_len = len;
+       *vec_idx = vec_id;
+
+       return 0;
+}
+
+static __rte_always_inline void
+update_shadow_used_ring(struct vhostpci_virtqueue *vq, uint16_t desc_idx,
+                       uint16_t len)
+{
+       uint16_t i = vq->shadow_used_idx++;
+
+       vq->shadow_used_ring[i].id  = desc_idx;
+       vq->shadow_used_ring[i].len = len;
+}
+
+static inline int
+reserve_avail_buf_mergeable(struct vhostpci_virtqueue *vq,
+                           uint32_t size, struct buf_vector *buf_vec,
+                           uint16_t *num_buffers, uint16_t avail_head)
+{
+       uint16_t cur_idx;
+       uint32_t vec_idx = 0;
+       uint16_t tries = 0;
+
+       uint16_t head_idx = 0;
+       uint16_t len = 0;
+
+       *num_buffers = 0;
+       cur_idx  = vq->last_avail_idx;
+
+       while (size > 0) {
+               if (unlikely(cur_idx == avail_head))
+                       return -1;
+
+               if (unlikely(fill_vec_buf(vq, cur_idx, &vec_idx, buf_vec,
+                               &head_idx, &len) < 0))
+                       return -1;
+               len = RTE_MIN(len, size);
+               update_shadow_used_ring(vq, head_idx, len);
+               size -= len;
+
+               cur_idx++;
+               tries++;
+               *num_buffers += 1;
+
+               /**
+                * if we tried all available ring items, and still
+                * can't get enough buf, it means something abnormal
+                * happened.
+                */
+               if (unlikely(tries >= vq->size))
+                       return -1;
+       }
+
+       return 0;
+}
+
+static __rte_always_inline void
+do_flush_shadow_used_ring(struct vhostpci_virtqueue *vq, uint16_t to,
+               uint16_t from, uint16_t size)
+{
+       rte_memcpy(&vq->used->ring[to], &vq->shadow_used_ring[from],
+                       size * sizeof(struct vring_used_elem));
+}
+
+static __rte_always_inline void
+flush_shadow_used_ring(struct vhostpci_virtqueue *vq)
+{
+       uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
+
+       if (used_idx + vq->shadow_used_idx <= vq->size) {
+               do_flush_shadow_used_ring(vq, used_idx, 0,
+                                         vq->shadow_used_idx);
+       } else {
+               uint16_t size;
+
+               /* update used ring interval [used_idx, vq->size] */
+               size = vq->size - used_idx;
+               do_flush_shadow_used_ring(vq, used_idx, 0, size);
+
+               /* update the left half used ring interval [0, left_size] */
+               do_flush_shadow_used_ring(vq, 0, size,
+                                         vq->shadow_used_idx - size);
+       }
+       vq->last_used_idx += vq->shadow_used_idx;
+
+       rte_smp_wmb();
+
+       *(volatile uint16_t *)&vq->used->idx += vq->shadow_used_idx;
+}
+
+static __rte_always_inline uint32_t
+vhostpci_dev_merge_rx(struct vhostpci_net *dev, uint16_t queue_id,
+               struct rte_mbuf **pkts, uint32_t count)
+{
+       struct vhostpci_virtqueue *vq;
+       uint32_t pkt_idx = 0;
+       uint16_t num_buffers;
+       struct buf_vector buf_vec[VHOSTPCI_BUF_VECTOR_MAX];
+       uint16_t avail_head;
+
+       if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring)))
+               return 0;
+
+       vq = dev->virtqueue[queue_id];
+       if (unlikely(vq->enabled == 0))
+               return 0;
+
+       count = RTE_MIN((uint32_t)VHOSTPCI_MAX_PKT_BURST, count);
+       if (count == 0)
+               return 0;
+
+       vq->batch_copy_nb_elems = 0;
+
+       rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
+
+       vq->shadow_used_idx = 0;
+       avail_head = *((volatile uint16_t *)&vq->avail->idx);
+       for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
+               uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
+
+               if (unlikely(reserve_avail_buf_mergeable(vq, pkt_len, buf_vec,
+                       &num_buffers, avail_head) < 0)) {
+                       vq->shadow_used_idx -= num_buffers;
+                       break;
+               }
+
+               if (copy_mbuf_to_desc_mergeable(dev, vq, pkts[pkt_idx],
+                                               buf_vec, num_buffers) < 0) {
+                       vq->shadow_used_idx -= num_buffers;
+                       break;
+               }
+
+               vq->last_avail_idx += num_buffers;
+       }
+
+       do_data_copy_enqueue(vq);
+
+       if (likely(vq->shadow_used_idx)) {
+               flush_shadow_used_ring(vq);
+
+               /* flush used->idx update before we read avail->flags. */
+               rte_mb();
+       }
+
+       return pkt_idx;
+}
+
+static uint16_t
+vhostpci_enqueue_burst(struct vhostpci_net *vpnet, uint16_t queue_id,
+                      struct rte_mbuf **pkts, uint16_t count)
+{
+       return vhostpci_dev_merge_rx(vpnet, queue_id, pkts, count);
+}
+
+static uint16_t
+eth_vhostpci_tx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
+{
+       struct vhostpci_queue *r = q;
+       uint16_t i, nb_tx = 0;
+       uint16_t nb_send = nb_bufs;
+
+       if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
+               return 0;
+
+       rte_atomic32_set(&r->while_queuing, 1);
+
+       if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
+               goto out;
+
+       /* Enqueue packets to RX queue in the other guest*/
+       while (nb_send) {
+               uint16_t nb_pkts;
+               uint16_t num = (uint16_t)RTE_MIN(nb_send,
+                                                VHOSTPCI_MAX_PKT_BURST);
+
+               nb_pkts = vhostpci_enqueue_burst(r->vpnet, r->virtqueue_id,
+                                               &bufs[nb_tx], num);
+
+               nb_tx += nb_pkts;
+               nb_send -= nb_pkts;
+               if (nb_pkts < num)
+                       break;
+       }
+
+       r->stats.pkts += nb_tx;
+       r->stats.missed_pkts += nb_bufs - nb_tx;
+
+       for (i = 0; likely(i < nb_tx); i++)
+               r->stats.bytes += bufs[i]->pkt_len;
+
+       for (i = 0; likely(i < nb_tx); i++)
+               rte_pktmbuf_free(bufs[i]);
+out:
+       rte_atomic32_set(&r->while_queuing, 0);
+
+       return nb_tx;
+}
+
 static __rte_always_inline void
 update_used_ring(struct vhostpci_virtqueue *vq,
                 uint32_t used_idx, uint32_t desc_idx)
@@ -1027,6 +1377,8 @@ eth_vhostpci_dev_init(struct rte_eth_dev *eth_dev)
                        vhostpci_interrupt_handler, eth_dev);
 
        eth_dev->rx_pkt_burst = &eth_vhostpci_rx;
+       eth_dev->tx_pkt_burst = &eth_vhostpci_tx;
+
        return 0;
 }
 
-- 
2.13.3

Reply via email to