> 
> when receive packets, the max bunch number of mbuf are allocated
> if hardware does not receive the max bunch number packets, it
> will free redundancy mbuf, this is low performance
> 
> so optimize rx performance, by allocating number of mbuf based on
> result of xsk_ring_cons__peek, to avoid to redundancy allocation,
> and free mbuf when receive packets
> 
> and rx cached_cons must be rollbacked if fail to allocating mbuf,
> found by Ciara Loftus
> 
> Signed-off-by: Li RongQing <lirongq...@baidu.com>
> Signed-off-by: Dongsheng Rong <rongdongsh...@baidu.com>
> ---
> 
> V2: rollback rx cached_cons if mbuf failed to be allocated
> V3: add comment when rollback rx cached_cons
> we should create a function for rollback as suggested by Ciara Loftus,
> like xsk_ring_cons__cancel, but this function should be in kernel,
> and I will send it to kernel
> 
>  drivers/net/af_xdp/rte_eth_af_xdp.c | 73 ++++++++++++++---------------
>  1 file changed, 35 insertions(+), 38 deletions(-)
> 
> diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c
> b/drivers/net/af_xdp/rte_eth_af_xdp.c
> index 2c7892bd7..69a4d54a3 100644
> --- a/drivers/net/af_xdp/rte_eth_af_xdp.c
> +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
> @@ -255,28 +255,32 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>       struct xsk_umem_info *umem = rxq->umem;
>       uint32_t idx_rx = 0;
>       unsigned long rx_bytes = 0;
> -     int rcvd, i;
> +     int i;
>       struct rte_mbuf *fq_bufs[ETH_AF_XDP_RX_BATCH_SIZE];
> 
> -     /* allocate bufs for fill queue replenishment after rx */
> -     if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
> -             AF_XDP_LOG(DEBUG,
> -                     "Failed to get enough buffers for fq.\n");
> -             return 0;
> -     }
> +     nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> 
> -     rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> -
> -     if (rcvd == 0) {
> +     if (nb_pkts == 0) {
>  #if defined(XDP_USE_NEED_WAKEUP)
>               if (xsk_ring_prod__needs_wakeup(fq))
>                       (void)poll(rxq->fds, 1, 1000);
>  #endif
> 
> -             goto out;
> +             return 0;
> +     }
> +
> +     /* allocate bufs for fill queue replenishment after rx */
> +     if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
> +             AF_XDP_LOG(DEBUG,
> +                     "Failed to get enough buffers for fq.\n");
> +             /* rollback cached_cons which is added by
> +              * xsk_ring_prod__needs_wakeup
> +              */

Thanks for adding the comment.
There's a small mistake here.
The function in which cached_cons is added is xsk_ring_cons__peek.
Could you please submit a v4 with this change?
Thanks!
Ciara


> +             rx->cached_cons -= nb_pkts;
> +             return 0;
>       }
> 
> -     for (i = 0; i < rcvd; i++) {
> +     for (i = 0; i < nb_pkts; i++) {
>               const struct xdp_desc *desc;
>               uint64_t addr;
>               uint32_t len;
> @@ -301,20 +305,14 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>               rx_bytes += len;
>       }
> 
> -     xsk_ring_cons__release(rx, rcvd);
> -
> -     (void)reserve_fill_queue(umem, rcvd, fq_bufs, fq);
> +     xsk_ring_cons__release(rx, nb_pkts);
> +     (void)reserve_fill_queue(umem, nb_pkts, fq_bufs, fq);
> 
>       /* statistics */
> -     rxq->stats.rx_pkts += rcvd;
> +     rxq->stats.rx_pkts += nb_pkts;
>       rxq->stats.rx_bytes += rx_bytes;
> 
> -out:
> -     if (rcvd != nb_pkts)
> -             rte_mempool_put_bulk(umem->mb_pool, (void
> **)&fq_bufs[rcvd],
> -                                  nb_pkts - rcvd);
> -
> -     return rcvd;
> +     return nb_pkts;
>  }
>  #else
>  static uint16_t
> @@ -326,7 +324,7 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs,
> uint16_t nb_pkts)
>       struct xsk_ring_prod *fq = &rxq->fq;
>       uint32_t idx_rx = 0;
>       unsigned long rx_bytes = 0;
> -     int rcvd, i;
> +     int i;
>       uint32_t free_thresh = fq->size >> 1;
>       struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE];
> 
> @@ -334,20 +332,24 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>               (void)reserve_fill_queue(umem,
> ETH_AF_XDP_RX_BATCH_SIZE,
>                                        NULL, fq);
> 
> -     if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts)
> != 0))
> -             return 0;
> -
> -     rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> -     if (rcvd == 0) {
> +     nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> +     if (nb_pkts == 0) {
>  #if defined(XDP_USE_NEED_WAKEUP)
>               if (xsk_ring_prod__needs_wakeup(fq))
>                       (void)poll(rxq->fds, 1, 1000);
>  #endif
> +             return 0;
> +     }
> 
> -             goto out;
> +     if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs,
> nb_pkts))) {
> +             /* rollback cached_cons which is added by
> +              * xsk_ring_prod__needs_wakeup
> +              */
> +             rx->cached_cons -= nb_pkts;
> +             return 0;
>       }
> 
> -     for (i = 0; i < rcvd; i++) {
> +     for (i = 0; i < nb_pkts; i++) {
>               const struct xdp_desc *desc;
>               uint64_t addr;
>               uint32_t len;
> @@ -366,18 +368,13 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
>               bufs[i] = mbufs[i];
>       }
> 
> -     xsk_ring_cons__release(rx, rcvd);
> +     xsk_ring_cons__release(rx, nb_pkts);
> 
>       /* statistics */
> -     rxq->stats.rx_pkts += rcvd;
> +     rxq->stats.rx_pkts += nb_pkts;
>       rxq->stats.rx_bytes += rx_bytes;
> 
> -out:
> -     if (rcvd != nb_pkts)
> -             rte_mempool_put_bulk(rxq->mb_pool, (void
> **)&mbufs[rcvd],
> -                                  nb_pkts - rcvd);
> -
> -     return rcvd;
> +     return nb_pkts;
>  }
>  #endif
> 
> --
> 2.17.3

Reply via email to