Current virtio_dev_merge_rx just looks like the old rte_vhost_dequeue_burst,
twisted logic, that you can see same code block in quite many places.

However, the logic virtio_dev_merge_rx is quite similar to virtio_dev_rx.
The big difference is that the meregeable one could allocate more than one
available entries to hold the data. Fetching all available entries to vec_buf
at once makes the difference a bit bigger then.

Anyway, it could be simpler, just like what we did for virtio_dev_rx(). The
difference is that we need to update used ring properly, as there could
be more than one entries:

        while (1) {
                if (this_desc_has_no_room) {
                        this_desc = fetch_next_from_vec_buf();

                        if (it is the last of a desc chain) {
                                update_used_ring();
                        }
                }

                if (this_mbuf_has_drained_totally) {
                        this_mbuf = fetch_next_mbuf();

                        if (this_mbuf == NULL)
                                break;
                }

                COPY(this_desc, this_mbuf);
        }

It reduces quite many lines of code, but it just saves a few bytes of
code size (which is expected, btw):

    # without this patch
    $ size /path/to/vhost_rxtx.o
       text    data     bss     dec     hex filename
      36539       0       0   36539    8ebb /path/to/vhost_rxtx.o

    # with this patch
    $ size /path/to/vhost_rxtx.o
       text    data     bss     dec     hex filename
      36171       0       0   36171    8d4b /path/to/vhost_rxtx.o

Signed-off-by: Yuanhan Liu <yuanhan.liu at linux.intel.com>
---
 lib/librte_vhost/vhost_rxtx.c | 389 +++++++++++++++++-------------------------
 1 file changed, 158 insertions(+), 231 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index cb29459..7464b6b 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -241,235 +241,194 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
        return count;
 }

+static inline int
+fill_vec_buf(struct vhost_virtqueue *vq, uint32_t avail_idx,
+            uint32_t *allocated, uint32_t *vec_idx)
+{
+       uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
+       uint32_t vec_id = *vec_idx;
+       uint32_t len    = *allocated;
+
+       while (1) {
+               if (vec_id >= BUF_VECTOR_MAX)
+                       return -1;
+
+               len += vq->desc[idx].len;
+               vq->buf_vec[vec_id].buf_addr = vq->desc[idx].addr;
+               vq->buf_vec[vec_id].buf_len  = vq->desc[idx].len;
+               vq->buf_vec[vec_id].desc_idx = idx;
+               vec_id++;
+
+               if ((vq->desc[idx].flags & VRING_DESC_F_NEXT) == 0)
+                       break;
+
+               idx = vq->desc[idx].next;
+       }
+
+       *allocated = len;
+       *vec_idx   = vec_id;
+
+       return 0;
+}
+
+/*
+ * As many data cores may want to access available buffers concurrently,
+ * they need to be reserved.
+ *
+ * Returns -1 on fail, 0 on success
+ */
+static inline int
+reserve_avail_buf_mergeable(struct vhost_virtqueue *vq, uint32_t size,
+                           uint16_t *start, uint16_t *end)
+{
+       uint16_t res_start_idx;
+       uint16_t res_cur_idx;
+       uint16_t avail_idx;
+       uint32_t allocated;
+       uint32_t vec_idx;
+       uint16_t tries;
+
+again:
+       res_start_idx = vq->last_used_idx_res;
+       res_cur_idx  = res_start_idx;
+
+       allocated = 0;
+       vec_idx   = 0;
+       tries     = 0;
+       while (1) {
+               avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+               if (unlikely(res_cur_idx == avail_idx)) {
+                       LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Failed "
+                               "to get enough desc from vring\n",
+                               dev->device_fh);
+                       return -1;
+               }
+
+               if (fill_vec_buf(vq, res_cur_idx, &allocated, &vec_idx) < 0)
+                       return -1;
+
+               res_cur_idx++;
+               tries++;
+
+               if (allocated >= size)
+                       break;
+
+               /*
+                * if we tried all available ring items, and still
+                * can't get enough buf, it means something abnormal
+                * happened.
+                */
+               if (tries >= vq->size)
+                       return -1;
+       }
+
+       /*
+        * update vq->last_used_idx_res atomically.
+        * retry again if failed.
+        */
+       if (rte_atomic16_cmpset(&vq->last_used_idx_res,
+                               res_start_idx, res_cur_idx) == 0)
+               goto again;
+
+       *start = res_start_idx;
+       *end   = res_cur_idx;
+       return 0;
+}
+
 static inline uint32_t __attribute__((always_inline))
-copy_from_mbuf_to_vring(struct virtio_net *dev, uint32_t queue_id,
-                       uint16_t res_base_idx, uint16_t res_end_idx,
-                       struct rte_mbuf *pkt)
+copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
+                           uint16_t res_start_idx, uint16_t res_end_idx,
+                           struct rte_mbuf *pkt)
 {
+       struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
        uint32_t vec_idx = 0;
-       uint32_t entry_success = 0;
-       struct vhost_virtqueue *vq;
-       /* The virtio_hdr is initialised to 0. */
-       struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {
-               {0, 0, 0, 0, 0, 0}, 0};
-       uint16_t cur_idx = res_base_idx;
-       uint64_t vb_addr = 0;
-       uint64_t vb_hdr_addr = 0;
-       uint32_t seg_offset = 0;
-       uint32_t vb_offset = 0;
-       uint32_t seg_avail;
-       uint32_t vb_avail;
-       uint32_t cpy_len, entry_len;
+       uint16_t cur_idx = res_start_idx;
+       uint64_t desc_addr;
+       uint32_t mbuf_offset, mbuf_avail;
+       uint32_t desc_offset, desc_avail;
+       uint32_t cpy_len;
+       uint16_t desc_idx, used_idx;
+       uint32_t nr_used = 0;

        if (pkt == NULL)
                return 0;

-       LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Current Index %d| "
-               "End Index %d\n",
+       LOG_DEBUG(VHOST_DATA,
+               "(%"PRIu64") Current Index %d| End Index %d\n",
                dev->device_fh, cur_idx, res_end_idx);

-       /*
-        * Convert from gpa to vva
-        * (guest physical addr -> vhost virtual addr)
-        */
-       vq = dev->virtqueue[queue_id];
+       desc_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);

-       vb_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
-       vb_hdr_addr = vb_addr;
-
-       /* Prefetch buffer address. */
-       rte_prefetch0((void *)(uintptr_t)vb_addr);
+       rte_prefetch0((void *)(uintptr_t)desc_addr);

-       virtio_hdr.num_buffers = res_end_idx - res_base_idx;
+       virtio_hdr.num_buffers = res_end_idx - res_start_idx;

        LOG_DEBUG(VHOST_DATA, "(%"PRIu64") RX: Num merge buffers %d\n",
                dev->device_fh, virtio_hdr.num_buffers);

-       rte_memcpy((void *)(uintptr_t)vb_hdr_addr,
+       rte_memcpy((void *)(uintptr_t)desc_addr,
                (const void *)&virtio_hdr, vq->vhost_hlen);
+       PRINT_PACKET(dev, (uintptr_t)desc_addr, vq->vhost_hlen, 0);

-       PRINT_PACKET(dev, (uintptr_t)vb_hdr_addr, vq->vhost_hlen, 1);
+       desc_avail  = vq->buf_vec[vec_idx].buf_len - vq->vhost_hlen;
+       desc_offset = vq->vhost_hlen;

-       seg_avail = rte_pktmbuf_data_len(pkt);
-       vb_offset = vq->vhost_hlen;
-       vb_avail = vq->buf_vec[vec_idx].buf_len - vq->vhost_hlen;
+       mbuf_avail  = rte_pktmbuf_data_len(pkt);
+       mbuf_offset = 0;
+       while (1) {
+               /* done with current desc buf, get the next one */
+               if (desc_avail == 0) {
+                       desc_idx = vq->buf_vec[vec_idx].desc_idx;

-       entry_len = vq->vhost_hlen;
+                       if ((vq->desc[desc_idx].flags & VRING_DESC_F_NEXT) == 
0) {
+                               /* Update used ring with desc information */
+                               used_idx = cur_idx++ & (vq->size - 1);
+                               vq->used->ring[used_idx].id  = desc_idx;
+                               vq->used->ring[used_idx].len = desc_offset;

-       if (vb_avail == 0) {
-               uint32_t desc_idx =
-                       vq->buf_vec[vec_idx].desc_idx;
+                               nr_used++;
+                       }

-               if ((vq->desc[desc_idx].flags
-                       & VRING_DESC_F_NEXT) == 0) {
-                       /* Update used ring with desc information */
-                       vq->used->ring[cur_idx & (vq->size - 1)].id
-                               = vq->buf_vec[vec_idx].desc_idx;
-                       vq->used->ring[cur_idx & (vq->size - 1)].len
-                               = entry_len;
+                       vec_idx++;
+                       desc_addr = gpa_to_vva(dev, 
vq->buf_vec[vec_idx].buf_addr);

-                       entry_len = 0;
-                       cur_idx++;
-                       entry_success++;
+                       /* Prefetch buffer address. */
+                       rte_prefetch0((void *)(uintptr_t)desc_addr);
+                       desc_offset = 0;
+                       desc_avail  = vq->buf_vec[vec_idx].buf_len;
                }

-               vec_idx++;
-               vb_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
-
-               /* Prefetch buffer address. */
-               rte_prefetch0((void *)(uintptr_t)vb_addr);
-               vb_offset = 0;
-               vb_avail = vq->buf_vec[vec_idx].buf_len;
-       }
-
-       cpy_len = RTE_MIN(vb_avail, seg_avail);
-
-       while (cpy_len > 0) {
-               /* Copy mbuf data to vring buffer */
-               rte_memcpy((void *)(uintptr_t)(vb_addr + vb_offset),
-                       rte_pktmbuf_mtod_offset(pkt, const void *, seg_offset),
-                       cpy_len);
-
-               PRINT_PACKET(dev,
-                       (uintptr_t)(vb_addr + vb_offset),
-                       cpy_len, 0);
-
-               seg_offset += cpy_len;
-               vb_offset += cpy_len;
-               seg_avail -= cpy_len;
-               vb_avail -= cpy_len;
-               entry_len += cpy_len;
-
-               if (seg_avail != 0) {
-                       /*
-                        * The virtio buffer in this vring
-                        * entry reach to its end.
-                        * But the segment doesn't complete.
-                        */
-                       if ((vq->desc[vq->buf_vec[vec_idx].desc_idx].flags &
-                               VRING_DESC_F_NEXT) == 0) {
-                               /* Update used ring with desc information */
-                               vq->used->ring[cur_idx & (vq->size - 1)].id
-                                       = vq->buf_vec[vec_idx].desc_idx;
-                               vq->used->ring[cur_idx & (vq->size - 1)].len
-                                       = entry_len;
-                               entry_len = 0;
-                               cur_idx++;
-                               entry_success++;
-                       }
-
-                       vec_idx++;
-                       vb_addr = gpa_to_vva(dev,
-                               vq->buf_vec[vec_idx].buf_addr);
-                       vb_offset = 0;
-                       vb_avail = vq->buf_vec[vec_idx].buf_len;
-                       cpy_len = RTE_MIN(vb_avail, seg_avail);
-               } else {
-                       /*
-                        * This current segment complete, need continue to
-                        * check if the whole packet complete or not.
-                        */
+               /* done with current mbuf, get the next one */
+               if (mbuf_avail == 0) {
                        pkt = pkt->next;
-                       if (pkt != NULL) {
-                               /*
-                                * There are more segments.
-                                */
-                               if (vb_avail == 0) {
-                                       /*
-                                        * This current buffer from vring is
-                                        * used up, need fetch next buffer
-                                        * from buf_vec.
-                                        */
-                                       uint32_t desc_idx =
-                                               vq->buf_vec[vec_idx].desc_idx;
-
-                                       if ((vq->desc[desc_idx].flags &
-                                               VRING_DESC_F_NEXT) == 0) {
-                                               uint16_t wrapped_idx =
-                                                       cur_idx & (vq->size - 
1);
-                                               /*
-                                                * Update used ring with the
-                                                * descriptor information
-                                                */
-                                               vq->used->ring[wrapped_idx].id
-                                                       = desc_idx;
-                                               vq->used->ring[wrapped_idx].len
-                                                       = entry_len;
-                                               entry_success++;
-                                               entry_len = 0;
-                                               cur_idx++;
-                                       }
-
-                                       /* Get next buffer from buf_vec. */
-                                       vec_idx++;
-                                       vb_addr = gpa_to_vva(dev,
-                                               vq->buf_vec[vec_idx].buf_addr);
-                                       vb_avail =
-                                               vq->buf_vec[vec_idx].buf_len;
-                                       vb_offset = 0;
-                               }
-
-                               seg_offset = 0;
-                               seg_avail = rte_pktmbuf_data_len(pkt);
-                               cpy_len = RTE_MIN(vb_avail, seg_avail);
-                       } else {
-                               /*
-                                * This whole packet completes.
-                                */
-                               /* Update used ring with desc information */
-                               vq->used->ring[cur_idx & (vq->size - 1)].id
-                                       = vq->buf_vec[vec_idx].desc_idx;
-                               vq->used->ring[cur_idx & (vq->size - 1)].len
-                                       = entry_len;
-                               entry_success++;
+                       if (!pkt)
                                break;
-                       }
-               }
-       }
-
-       return entry_success;
-}

-static inline void __attribute__((always_inline))
-update_secure_len(struct vhost_virtqueue *vq, uint32_t id,
-       uint32_t *secure_len, uint32_t *vec_idx)
-{
-       uint16_t wrapped_idx = id & (vq->size - 1);
-       uint32_t idx = vq->avail->ring[wrapped_idx];
-       uint8_t next_desc;
-       uint32_t len = *secure_len;
-       uint32_t vec_id = *vec_idx;
+                       mbuf_offset = 0;
+                       mbuf_avail  = rte_pktmbuf_data_len(pkt);
+               }

-       do {
-               next_desc = 0;
-               len += vq->desc[idx].len;
-               vq->buf_vec[vec_id].buf_addr = vq->desc[idx].addr;
-               vq->buf_vec[vec_id].buf_len = vq->desc[idx].len;
-               vq->buf_vec[vec_id].desc_idx = idx;
-               vec_id++;
+               COPY(desc_addr + desc_offset,
+                       rte_pktmbuf_mtod_offset(pkt, uint64_t, mbuf_offset));
+               PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
+                            cpy_len, 0);
+       }

-               if (vq->desc[idx].flags & VRING_DESC_F_NEXT) {
-                       idx = vq->desc[idx].next;
-                       next_desc = 1;
-               }
-       } while (next_desc);
+       used_idx = cur_idx & (vq->size - 1);
+       vq->used->ring[used_idx].id = vq->buf_vec[vec_idx].desc_idx;
+       vq->used->ring[used_idx].len = desc_offset;
+       nr_used++;

-       *secure_len = len;
-       *vec_idx = vec_id;
+       return nr_used;
 }

-/*
- * This function works for mergeable RX.
- */
 static inline uint32_t __attribute__((always_inline))
 virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
        struct rte_mbuf **pkts, uint32_t count)
 {
        struct vhost_virtqueue *vq;
-       uint32_t pkt_idx = 0, entry_success = 0;
-       uint16_t avail_idx;
-       uint16_t res_base_idx, res_cur_idx;
-       uint8_t success = 0;
+       uint32_t pkt_idx = 0, nr_used = 0;
+       uint16_t start, end;

        LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_merge_rx()\n",
                dev->device_fh);
@@ -485,62 +444,30 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t 
queue_id,
                return 0;

        count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
-
        if (count == 0)
                return 0;

        for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
                uint32_t pkt_len = pkts[pkt_idx]->pkt_len + vq->vhost_hlen;

-               do {
-                       /*
-                        * As many data cores may want access to available
-                        * buffers, they need to be reserved.
-                        */
-                       uint32_t secure_len = 0;
-                       uint32_t vec_idx = 0;
-
-                       res_base_idx = vq->last_used_idx_res;
-                       res_cur_idx = res_base_idx;
-
-                       do {
-                               avail_idx = *((volatile uint16_t 
*)&vq->avail->idx);
-                               if (unlikely(res_cur_idx == avail_idx)) {
-                                       LOG_DEBUG(VHOST_DATA,
-                                               "(%"PRIu64") Failed "
-                                               "to get enough desc from "
-                                               "vring\n",
-                                               dev->device_fh);
-                                       goto merge_rx_exit;
-                               } else {
-                                       update_secure_len(vq, res_cur_idx, 
&secure_len, &vec_idx);
-                                       res_cur_idx++;
-                               }
-                       } while (pkt_len > secure_len);
-
-                       /* vq->last_used_idx_res is atomically updated. */
-                       success = rte_atomic16_cmpset(&vq->last_used_idx_res,
-                                                       res_base_idx,
-                                                       res_cur_idx);
-               } while (success == 0);
-
-               entry_success = copy_from_mbuf_to_vring(dev, queue_id,
-                       res_base_idx, res_cur_idx, pkts[pkt_idx]);
+               if (reserve_avail_buf_mergeable(vq, pkt_len, &start, &end) < 0)
+                       break;

+               nr_used = copy_mbuf_to_desc_mergeable(dev, vq, start, end,
+                                                     pkts[pkt_idx]);
                rte_compiler_barrier();

                /*
                 * Wait until it's our turn to add our buffer
                 * to the used ring.
                 */
-               while (unlikely(vq->last_used_idx != res_base_idx))
+               while (unlikely(vq->last_used_idx != start))
                        rte_pause();

-               *(volatile uint16_t *)&vq->used->idx += entry_success;
-               vq->last_used_idx = res_cur_idx;
+               *(volatile uint16_t *)&vq->used->idx += nr_used;
+               vq->last_used_idx = end;
        }

-merge_rx_exit:
        if (likely(pkt_idx)) {
                /* flush used->idx update before we read avail->flags. */
                rte_mb();
-- 
1.9.0

Reply via email to