From: Long Li <lon...@microsoft.com>

Instead of allocating mbufs one by one during RX, use
rte_pktmbuf_alloc_bulk() to allocate them in a batch.

With this patch, there are no measurable performance improvements in
benchmarks. However, this patch should improve CPU cycles and reduce
potential locking conflicts in real-world applications.

Signed-off-by: Long Li <lon...@microsoft.com>
---
Change in v2:
use rte_calloc_socket() in place of rte_calloc()

v3:
add more comment explaining the benefit of doing alloc_bulk.
free mbufs that are failed to post

v4:
replace rte_calloc_socket() with a fixed array on the stack

 drivers/net/mana/rx.c | 76 ++++++++++++++++++++++++++++---------------
 1 file changed, 50 insertions(+), 26 deletions(-)

diff --git a/drivers/net/mana/rx.c b/drivers/net/mana/rx.c
index acad5e26cd..4fc0f789d8 100644
--- a/drivers/net/mana/rx.c
+++ b/drivers/net/mana/rx.c
@@ -59,9 +59,8 @@ mana_rq_ring_doorbell(struct mana_rxq *rxq)
 }
 
 static int
-mana_alloc_and_post_rx_wqe(struct mana_rxq *rxq)
+mana_post_rx_wqe(struct mana_rxq *rxq, struct rte_mbuf *mbuf)
 {
-       struct rte_mbuf *mbuf = NULL;
        struct gdma_sgl_element sgl[1];
        struct gdma_work_request request;
        uint32_t wqe_size_in_bu;
@@ -69,12 +68,6 @@ mana_alloc_and_post_rx_wqe(struct mana_rxq *rxq)
        int ret;
        struct mana_mr_cache *mr;
 
-       mbuf = rte_pktmbuf_alloc(rxq->mp);
-       if (!mbuf) {
-               rxq->stats.nombuf++;
-               return -ENOMEM;
-       }
-
        mr = mana_alloc_pmd_mr(&rxq->mr_btree, priv, mbuf);
        if (!mr) {
                DP_LOG(ERR, "failed to register RX MR");
@@ -120,20 +113,33 @@ mana_alloc_and_post_rx_wqe(struct mana_rxq *rxq)
 /*
  * Post work requests for a Rx queue.
  */
+#define MANA_MBUF_BULK 32u
 static int
-mana_alloc_and_post_rx_wqes(struct mana_rxq *rxq)
+mana_alloc_and_post_rx_wqes(struct mana_rxq *rxq, uint32_t count)
 {
        int ret;
-       uint32_t i;
+       uint32_t i, batch_count;
+       struct rte_mbuf *mbufs[MANA_MBUF_BULK];
+
+more_mbufs:
+       batch_count = RTE_MIN(count, MANA_MBUF_BULK);
+       ret = rte_pktmbuf_alloc_bulk(rxq->mp, mbufs, batch_count);
+       if (ret) {
+               DP_LOG(ERR, "failed to allocate mbufs for RX");
+               rxq->stats.nombuf += count;
+
+               /* Bail out to ring doorbell for posted packets */
+               goto out;
+       }
 
 #ifdef RTE_ARCH_32
        rxq->wqe_cnt_to_short_db = 0;
 #endif
-       for (i = 0; i < rxq->num_desc; i++) {
-               ret = mana_alloc_and_post_rx_wqe(rxq);
+       for (i = 0; i < batch_count; i++) {
+               ret = mana_post_rx_wqe(rxq, mbufs[i]);
                if (ret) {
                        DP_LOG(ERR, "failed to post RX ret = %d", ret);
-                       return ret;
+                       break;
                }
 
 #ifdef RTE_ARCH_32
@@ -144,8 +150,15 @@ mana_alloc_and_post_rx_wqes(struct mana_rxq *rxq)
 #endif
        }
 
-       mana_rq_ring_doorbell(rxq);
+       /* Free the remaining mbufs that are not posted */
+       rte_pktmbuf_free_bulk(&mbufs[i], batch_count - i);
+
+       count -= batch_count;
+       if (count > 0)
+               goto more_mbufs;
 
+out:
+       mana_rq_ring_doorbell(rxq);
        return ret;
 }
 
@@ -404,7 +417,9 @@ mana_start_rx_queues(struct rte_eth_dev *dev)
        }
 
        for (i = 0; i < priv->num_queues; i++) {
-               ret = mana_alloc_and_post_rx_wqes(dev->data->rx_queues[i]);
+               struct mana_rxq *rxq = dev->data->rx_queues[i];
+
+               ret = mana_alloc_and_post_rx_wqes(rxq, rxq->num_desc);
                if (ret)
                        goto fail;
        }
@@ -423,7 +438,7 @@ uint16_t
 mana_rx_burst(void *dpdk_rxq, struct rte_mbuf **pkts, uint16_t pkts_n)
 {
        uint16_t pkt_received = 0;
-       uint16_t wqe_posted = 0;
+       uint16_t wqe_consumed = 0;
        struct mana_rxq *rxq = dpdk_rxq;
        struct mana_priv *priv = rxq->priv;
        struct rte_mbuf *mbuf;
@@ -535,18 +550,23 @@ mana_rx_burst(void *dpdk_rxq, struct rte_mbuf **pkts, 
uint16_t pkts_n)
 
                rxq->gdma_rq.tail += desc->wqe_size_in_bu;
 
-               /* Consume this request and post another request */
-               ret = mana_alloc_and_post_rx_wqe(rxq);
-               if (ret) {
-                       DP_LOG(ERR, "failed to post rx wqe ret=%d", ret);
-                       break;
-               }
-
-               wqe_posted++;
+               /* Record the number of the RX WQE we need to post to replenish
+                * consumed RX requests
+                */
+               wqe_consumed++;
                if (pkt_received == pkts_n)
                        break;
 
 #ifdef RTE_ARCH_32
+               /* Always post WQE as soon as it's consumed for short DB */
+               ret = mana_alloc_and_post_rx_wqes(rxq, wqe_consumed);
+               if (ret) {
+                       DRV_LOG(ERR, "failed to post %d WQEs, ret %d",
+                               wqe_consumed, ret);
+                       return pkt_received;
+               }
+               wqe_consumed = 0;
+
                /* Ring short doorbell if approaching the wqe increment
                 * limit.
                 */
@@ -569,8 +589,12 @@ mana_rx_burst(void *dpdk_rxq, struct rte_mbuf **pkts, 
uint16_t pkts_n)
                goto repoll;
        }
 
-       if (wqe_posted)
-               mana_rq_ring_doorbell(rxq);
+       if (wqe_consumed) {
+               ret = mana_alloc_and_post_rx_wqes(rxq, wqe_consumed);
+               if (ret)
+                       DRV_LOG(ERR, "failed to post %d WQEs, ret %d",
+                               wqe_consumed, ret);
+       }
 
        return pkt_received;
 }
-- 
2.17.1

Reply via email to