On Fri, Apr 12, 2019 at 4:54 PM Xiaolong Ye <xiaolong...@intel.com> wrote:
> As David pointed out, if we reserve N slots, but only submit n slots, > we would end up with an incorrect opinion of the number of available slots > later, we also would get wrong idx when we call xsk_ring_prod__reserve next > time. It also applies to xsk_ring_cons__peek()/xsk_ring_cons__release(). > > This patch ensures that both reserve/submit and peek/release are > consistent. > > Fixes: f1debd77efaf ("net/af_xdp: introduce AF_XDP PMD") > > Reported-by: David Marchand <david.march...@redhat.com> > Signed-off-by: Xiaolong Ye <xiaolong...@intel.com> > --- > drivers/net/af_xdp/rte_eth_af_xdp.c | 80 +++++++++++++++-------------- > 1 file changed, 41 insertions(+), 39 deletions(-) > > diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c > b/drivers/net/af_xdp/rte_eth_af_xdp.c > index 5cc643ce2..76a6a8331 100644 > --- a/drivers/net/af_xdp/rte_eth_af_xdp.c > +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c > @@ -138,22 +138,19 @@ reserve_fill_queue(struct xsk_umem_info *umem, int > reserve_size) > { > struct xsk_ring_prod *fq = &umem->fq; > uint32_t idx; > - int i, ret; > - > - ret = xsk_ring_prod__reserve(fq, reserve_size, &idx); > - if (unlikely(!ret)) { > - AF_XDP_LOG(ERR, "Failed to reserve enough fq descs.\n"); > - return ret; > - } > + int i; > > for (i = 0; i < reserve_size; i++) { > __u64 *fq_addr; > void *addr = NULL; > if (rte_ring_dequeue(umem->buf_ring, &addr)) { > - i--; > break; > } > - fq_addr = xsk_ring_prod__fill_addr(fq, idx++); > + if (unlikely(!xsk_ring_prod__reserve(fq, 1, &idx))) { > + AF_XDP_LOG(WARNING, "Failed to reserve 1 fq > desc.\n"); > + break; > + } > + fq_addr = xsk_ring_prod__fill_addr(fq, idx); > *fq_addr = (uint64_t)addr; > } > > I just spotted that reserve_fill_queue always returns 0. I understand that xsk_configure expects an errors when not succeeding in populating this ring. And for this, it expects a non zero value for this. How about something like (neither tested nor compiled): static inline int reserve_fill_queue(struct xsk_umem_info *umem, int reserve_size) { struct xsk_ring_prod *fq = &umem->fq; void *addrs[reserve_size]; uint32_t idx; int i, ret; if (rte_ring_dequeue_bulk(umem->buf_ring, &addrs, reserve_size, NULL) != reserve_size) { AF_XDP_LOG(DEBUG, "Failed to get enough buffers for fq.\n"); return -1; } ret = xsk_ring_prod__reserve(fq, reserve_size, &idx); if (unlikely(!ret)) { AF_XDP_LOG(DEBUG, "Failed to reserve enough fq descs.\n"); rte_ring_enqueue_bulk(umem->buf_ring, &addrs, reserve_size, NULL); return -1; } for (i = 0; i < reserve_size; i++) { __u64 *fq_addr; fq_addr = xsk_ring_prod__fill_addr(fq, idx++); *fq_addr = (uint64_t)addrs[i]; } xsk_ring_prod__submit(fq, reserve_size); return 0; } @@ -179,6 +176,9 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, > uint16_t nb_pkts) > > nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE); > > + if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts) > != 0)) > + return 0; > + > rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx); > if (rcvd == 0) > return 0; > When xsk_ring_cons__peek() returns 0, we will leak nb_pkts freshly allocated mbufs. See below for a suggestion. @@ -186,9 +186,6 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, > uint16_t nb_pkts) > if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh) > (void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE); > > - if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, rcvd) != > 0)) > - return 0; > - > for (i = 0; i < rcvd; i++) { > const struct xdp_desc *desc; > uint64_t addr; > @@ -211,6 +208,10 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, > uint16_t nb_pkts) > > xsk_ring_cons__release(rx, rcvd); > > + /* free the extra mbufs */ > + for (; rcvd < nb_pkts; rcvd++) > + rte_pktmbuf_free(mbufs[rcvd]); > + > You can move this block after the statistic update... /* statistics */ > rxq->stats.rx_pkts += (rcvd - dropped); > rxq->stats.rx_bytes += rx_bytes; > ... then define a out: label. And those mbufs are still clean and coming from a single mempool, we can put them back as a single bulk. Something like (again, untested): out: if (count != nb_pkts) { rte_mempool_put_bulk(rxq->mb_pool, &mbufs[count], nb_pkts - count); } return count; } And you would jump to this label when xsk_ring_cons__peek() == 0. What do you think ? @@ -261,55 +262,56 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, > uint16_t nb_pkts) > struct xsk_umem_info *umem = txq->pair->umem; > struct rte_mbuf *mbuf; > void *addrs[ETH_AF_XDP_TX_BATCH_SIZE]; > + struct rte_mbuf *valid_bufs[ETH_AF_XDP_TX_BATCH_SIZE]; > unsigned long tx_bytes = 0; > - int i, valid = 0; > + int i; > + uint16_t nb_valid = 0; > uint32_t idx_tx; > + uint32_t buf_len = ETH_AF_XDP_FRAME_SIZE - > ETH_AF_XDP_DATA_HEADROOM; > > nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE); > > pull_umem_cq(umem, nb_pkts); > > - nb_pkts = rte_ring_dequeue_bulk(umem->buf_ring, addrs, > - nb_pkts, NULL); > - if (nb_pkts == 0) > + for (i = 0; i < nb_pkts; i++) { > + if (bufs[i]->pkt_len <= buf_len) > + valid_bufs[nb_valid++] = bufs[i]; > + else > + rte_pktmbuf_free(bufs[i]); > + } > + > + nb_valid = rte_ring_dequeue_bulk(umem->buf_ring, addrs, > + nb_valid, NULL); > + if (nb_valid == 0) > return 0; > > You can't return 0. You have stolen buffers from the caller with the previous check on pktlen. When the application resends this bulk or frees the whole bulk, we will have mbuf reuse bugs. Thinking about this, why would this happen ? This limitation should be handled by properly reporting the mtu. The application would then know it can't send those too big mbufs. If I missed something else and/or if you still don't trust the application, I think the tx burst function should go like as described below. The first thing to do is check the mbufs length. At the first invalid length, you break from the loop at index i (i is the invalid buffer index). Then dequeue i - 1 buffers from buf_ring. Reserve i - 1 slots in tx. And return i buffers have been sent (plus a tx error stat += 1). You need to carefully take into account each step and free the buffer to buf_ring when relevant and free the mbufs properly. - if (xsk_ring_prod__reserve(&txq->tx, nb_pkts, &idx_tx) != nb_pkts) { > + if (xsk_ring_prod__reserve(&txq->tx, nb_valid, &idx_tx) != > nb_valid) { > kick_tx(txq); > - rte_ring_enqueue_bulk(umem->buf_ring, addrs, nb_pkts, > NULL); > + rte_ring_enqueue_bulk(umem->buf_ring, addrs, nb_valid, > NULL); > return 0; > } > > - for (i = 0; i < nb_pkts; i++) { > + for (i = 0; i < nb_valid; i++) { > struct xdp_desc *desc; > void *pkt; > - uint32_t buf_len = ETH_AF_XDP_FRAME_SIZE > - - ETH_AF_XDP_DATA_HEADROOM; > desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx + i); > - mbuf = bufs[i]; > - if (mbuf->pkt_len <= buf_len) { > - desc->addr = (uint64_t)addrs[valid]; > - desc->len = mbuf->pkt_len; > - pkt = xsk_umem__get_data(umem->mz->addr, > - desc->addr); > - rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *), > - desc->len); > - valid++; > - tx_bytes += mbuf->pkt_len; > - } > + mbuf = valid_bufs[i]; > + desc->addr = (uint64_t)addrs[i]; > + desc->len = mbuf->pkt_len; > + pkt = xsk_umem__get_data(umem->mz->addr, > + desc->addr); > + rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *), > + desc->len); > + tx_bytes += mbuf->pkt_len; > rte_pktmbuf_free(mbuf); > } > > - xsk_ring_prod__submit(&txq->tx, nb_pkts); > + xsk_ring_prod__submit(&txq->tx, nb_valid); > > kick_tx(txq); > > - if (valid < nb_pkts) > - rte_ring_enqueue_bulk(umem->buf_ring, &addrs[valid], > - nb_pkts - valid, NULL); > - > - txq->stats.err_pkts += nb_pkts - valid; > - txq->stats.tx_pkts += valid; > + txq->stats.err_pkts += nb_pkts - nb_valid; > + txq->stats.tx_pkts += nb_valid; > txq->stats.tx_bytes += tx_bytes; > > return nb_pkts; > -- > 2.17.1 > > -- David Marchand