Add support copying scattered mbuf to vring which is done by dev_scatter_rx,
and check the 'next' pointer in mbuf on the fly to select suitable function to 
rx packets.

Signed-off-by: Changchun Ouyang <changchun.ouyang at intel.com>
---
 lib/librte_vhost/vhost_rxtx.c | 116 +++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 115 insertions(+), 1 deletion(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index bb56ae1..3086bb4 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -46,7 +46,8 @@
  * This function adds buffers to the virtio devices RX virtqueue. Buffers can
  * be received from the physical port or from another virtio device. A packet
  * count is returned to indicate the number of packets that are succesfully
- * added to the RX queue. This function works when mergeable is disabled.
+ * added to the RX queue. This function works when mergeable is disabled and
+ * the mbuf is not scattered.
  */
 static inline uint32_t __attribute__((always_inline))
 virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
@@ -447,6 +448,103 @@ fill_buf_vec(struct vhost_virtqueue *vq, uint16_t id, 
uint32_t *vec_idx)
 }

 /*
+ * This function works for scatter-gather RX.
+ */
+static inline uint32_t __attribute__((always_inline))
+virtio_dev_scatter_rx(struct virtio_net *dev, uint16_t queue_id,
+       struct rte_mbuf **pkts, uint32_t count)
+{
+       struct vhost_virtqueue *vq;
+       uint32_t pkt_idx = 0, entry_success = 0;
+       uint16_t avail_idx;
+       uint16_t res_base_idx, res_end_idx;
+       uint8_t success = 0;
+
+       LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_scatter_rx()\n",
+               dev->device_fh);
+       if (unlikely(queue_id != VIRTIO_RXQ))
+               LOG_DEBUG(VHOST_DATA, "mq isn't supported in this version.\n");
+
+       vq = dev->virtqueue[VIRTIO_RXQ];
+       count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
+
+       if (count == 0)
+               return 0;
+
+       for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
+               uint32_t secure_len = 0;
+               uint32_t vec_idx = 0;
+               uint32_t pkt_len = pkts[pkt_idx]->pkt_len + vq->vhost_hlen;
+
+               do {
+                       /*
+                        * As many data cores may want access to available
+                        * buffers, they need to be reserved.
+                        */
+                       res_base_idx = vq->last_used_idx_res;
+                       avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+
+                       if (unlikely(res_base_idx == avail_idx)) {
+                               LOG_DEBUG(VHOST_DATA,
+                                       "(%"PRIu64") Failed "
+                                       "to get enough desc from "
+                                       "vring\n",
+                                       dev->device_fh);
+                               return pkt_idx;
+                       } else {
+                               uint16_t wrapped_idx =
+                                       (res_base_idx) & (vq->size - 1);
+                               uint32_t idx = vq->avail->ring[wrapped_idx];
+
+                               update_secure_len(vq, idx, &secure_len);
+                       }
+
+                       if (pkt_len > secure_len) {
+                               LOG_DEBUG(VHOST_DATA,
+                                       "(%"PRIu64") Failed "
+                                       "to get enough desc from "
+                                       "vring\n",
+                                       dev->device_fh);
+                               return pkt_idx;
+                       }
+
+                       /* vq->last_used_idx_res is atomically updated. */
+                       success = rte_atomic16_cmpset(&vq->last_used_idx_res,
+                                                       res_base_idx,
+                                                       res_base_idx + 1);
+               } while (success == 0);
+
+               fill_buf_vec(vq, res_base_idx, &vec_idx);
+
+               res_end_idx = res_base_idx + 1;
+
+               entry_success = copy_from_mbuf_to_vring(dev, res_base_idx,
+                       res_end_idx, pkts[pkt_idx]);
+
+               rte_compiler_barrier();
+
+               /*
+                * Wait until it's our turn to add our buffer
+                * to the used ring.
+                */
+               while (unlikely(vq->last_used_idx != res_base_idx))
+                       rte_pause();
+
+               *(volatile uint16_t *)&vq->used->idx += entry_success;
+               vq->last_used_idx = res_end_idx;
+
+               /* flush used->idx update before we read avail->flags. */
+               rte_mb();
+
+               /* Kick the guest if necessary. */
+               if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
+                       eventfd_write((int)vq->callfd, 1);
+       }
+
+       return count;
+}
+
+/*
  * This function works for mergeable RX.
  */
 static inline uint32_t __attribute__((always_inline))
@@ -545,12 +643,28 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t 
queue_id,
        return count;
 }

+/*
+ * Return 1 if any mbuf is scattered, otherwise return 0.
+ */
+static inline uint32_t __attribute__((always_inline))
+check_scatter(struct rte_mbuf **pkts, uint16_t count)
+{
+       uint32_t i;
+       for (i = 0; i < count; i++) {
+               if (pkts[i]->next != NULL)
+                       return 1;
+       }
+       return 0;
+}
+
 uint16_t
 rte_vhost_enqueue_burst(struct virtio_net *dev, uint16_t queue_id,
        struct rte_mbuf **pkts, uint16_t count)
 {
        if (unlikely(dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF)))
                return virtio_dev_merge_rx(dev, queue_id, pkts, count);
+       else if (unlikely(check_scatter(pkts, count) == 1))
+               return virtio_dev_scatter_rx(dev, queue_id, pkts, count);
        else
                return virtio_dev_rx(dev, queue_id, pkts, count);
 }
-- 
1.8.4.2

Reply via email to