Hi, William. Nice work.
I have some feedback and questions.

On 18/12/27 (木) 5:27:49, William Tu wrote:
The patch adds support for AF_XDP async xmit.  Users can use
AF_XDP on both sides of the veth and get better performance, with
the cost of ksoftirqd doing the xmit.  The veth_xsk_async_xmit
simply kicks the napi function, veth_poll, to receive the packets
that are on the umem transmit ring at the _peer_ side.

Tested using two namespaces, one runs xdpsock and the other runs
xdp_rxq_info.  A simple script comparing the performance with/without
AF_XDP shows improvement from 724Kpps to 1.1Mpps.

   ip netns add at_ns0
   ip link add p0 type veth peer name p1
   ip link set p0 netns at_ns0
   ip link set dev p1 up
   ip netns exec at_ns0 ip link set dev p0 up

   # receiver
   ip netns exec at_ns0 xdp_rxq_info --dev p0 --action XDP_DROP

   # sender
   xdpsock -i p1 -t -N -z
   or
   xdpsock -i p1 -t -S

Signed-off-by: William Tu <u9012...@gmail.com>
---
  drivers/net/veth.c | 200 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
  1 file changed, 199 insertions(+), 1 deletion(-)

diff --git a/drivers/net/veth.c b/drivers/net/veth.c
index f412ea1cef18..10cf9ded59f1 100644
--- a/drivers/net/veth.c
+++ b/drivers/net/veth.c
@@ -25,6 +25,10 @@
  #include <linux/ptr_ring.h>
  #include <linux/bpf_trace.h>
  #include <linux/net_tstamp.h>
+#include <net/xdp_sock.h>
+#include <linux/mm.h>
+#include <linux/slab.h>
+#include <net/page_pool.h>
#define DRV_NAME "veth"
  #define DRV_VERSION   "1.0"
@@ -53,6 +57,8 @@ struct veth_rq {
        bool                    rx_notify_masked;
        struct ptr_ring         xdp_ring;
        struct xdp_rxq_info     xdp_rxq;
+       struct xdp_umem *xsk_umem;
+       u16 qid;

veth_rq has 31 bytes hole after rx_notify_masked, so they should be placed there.

  };
struct veth_priv {
@@ -737,11 +743,95 @@ static int veth_xdp_rcv(struct veth_rq *rq, int budget, 
unsigned int *xdp_xmit)
        return done;
  }
+static int veth_xsk_poll(struct napi_struct *napi, int budget)
+{
+       struct veth_priv *priv, *peer_priv;
+       struct net_device *dev, *peer_dev;
+       struct veth_rq *peer_rq;
+       struct veth_rq *rq =
+               container_of(napi, struct veth_rq, xdp_napi);
+       int done = 0;
+
+       dev = rq->dev;
+       priv = netdev_priv(dev);
+       peer_dev = priv->peer;
+       peer_priv = netdev_priv(peer_dev);
+       peer_rq = &peer_priv->rq[rq->qid];
+
+       while (peer_rq->xsk_umem && budget--) {
+               unsigned int inner_xdp_xmit = 0;
+               unsigned int metasize = 0;
+               struct xdp_frame *xdpf;
+               bool dropped = false;
+               struct sk_buff *skb;
+               struct page *page;
+               void *vaddr;
+               void *addr;
+               u32 len;
+
+               if (!xsk_umem_consume_tx_virtual(peer_rq->xsk_umem, &vaddr, 
&len))
+                       break;

How do you prevent races around xsk_umem?
It seems you are checking xsk_umem in above while() condition, but there is no guarantee that xsk_umem is not NULL here, since umem can be disabled under us?

+
+               page = dev_alloc_page();
+               if (!page) {
+                       xsk_umem_complete_tx(peer_rq->xsk_umem, 1);
+                       xsk_umem_consume_tx_done(peer_rq->xsk_umem);
+                       return -ENOMEM;
+               }
+
+               addr = page_to_virt(page);
+               xdpf = addr;
+               memset(xdpf, 0, sizeof(*xdpf));
+
+               addr += sizeof(*xdpf);
+               memcpy(addr, vaddr, len);
+
+               xdpf->data = addr + metasize;
+               xdpf->len = len;
+               xdpf->headroom = 0;
+               xdpf->metasize = metasize;
+               xdpf->mem.type = MEM_TYPE_PAGE_SHARED;
+
+               /* put into rq */
+               skb = veth_xdp_rcv_one(rq, xdpf, &inner_xdp_xmit);
+               if (!skb) {
+                       /* Peer side has XDP program attached */
+                       if (inner_xdp_xmit & VETH_XDP_TX) {
+                               /* Not supported */
+                               pr_warn("veth: peer XDP_TX not supported\n");

As this can be triggered by users we need ratelimit at least.

But since this is envisioned to be used in OVS, XDP_TX would be a very important feature to me. I expect XDP programs in containers to process packets and send back to OVS.

+                               xdp_return_frame(xdpf);
+                               dropped = true;
+                               goto skip_tx;
+                       } else if (inner_xdp_xmit & VETH_XDP_REDIR) {
+                               xdp_do_flush_map();
+                       } else {
+                               dropped = true;
+                       }
+               } else {
+                       napi_gro_receive(&rq->xdp_napi, skb);
+               }
+skip_tx:
+               xsk_umem_complete_tx(peer_rq->xsk_umem, 1);
+               xsk_umem_consume_tx_done(peer_rq->xsk_umem);
+
+               /* update rq stats */
+               u64_stats_update_begin(&rq->stats.syncp);
+               rq->stats.xdp_packets++;
+               rq->stats.xdp_bytes += len;
+               if (dropped)
+                       rq->stats.xdp_drops++;
+               u64_stats_update_end(&rq->stats.syncp);
+               done++;
+       }
+       return done;
+}
+
  static int veth_poll(struct napi_struct *napi, int budget)
  {
        struct veth_rq *rq =
                container_of(napi, struct veth_rq, xdp_napi);
        unsigned int xdp_xmit = 0;
+       int tx_done;
        int done;
xdp_set_return_frame_no_direct();
@@ -756,13 +846,17 @@ static int veth_poll(struct napi_struct *napi, int budget)
                }
        }
+ tx_done = veth_xsk_poll(napi, budget);
+       if (tx_done > 0)
+               done += tx_done;
+

This receives packets more than budget.

        if (xdp_xmit & VETH_XDP_TX)
                veth_xdp_flush(rq->dev);
        if (xdp_xmit & VETH_XDP_REDIR)
                xdp_do_flush_map();
        xdp_clear_return_frame_no_direct();
- return done;
+       return done > budget ? budget : done;
  }
static int veth_napi_add(struct net_device *dev)
@@ -776,6 +870,7 @@ static int veth_napi_add(struct net_device *dev)
                err = ptr_ring_init(&rq->xdp_ring, VETH_RING_SIZE, GFP_KERNEL);
                if (err)
                        goto err_xdp_ring;
+               rq->qid = i;
        }
for (i = 0; i < dev->real_num_rx_queues; i++) {
@@ -812,6 +907,7 @@ static void veth_napi_del(struct net_device *dev)
                netif_napi_del(&rq->xdp_napi);
                rq->rx_notify_masked = false;
                ptr_ring_cleanup(&rq->xdp_ring, veth_ptr_free);
+               rq->qid = -1;
        }
  }
@@ -836,6 +932,7 @@ static int veth_enable_xdp(struct net_device *dev) /* Save original mem info as it can be overwritten */
                        rq->xdp_mem = rq->xdp_rxq.mem;
+                       rq->qid = i;
                }
err = veth_napi_add(dev);
@@ -1115,6 +1212,84 @@ static u32 veth_xdp_query(struct net_device *dev)
        return 0;
  }
+int veth_xsk_umem_query(struct net_device *dev, struct xdp_umem **umem,
+                       u16 qid)
+{
+       struct xdp_umem *queried_umem;
+
+       queried_umem = xdp_get_umem_from_qid(dev, qid);
+
+       if (!queried_umem)
+               return -EINVAL;
+
+       *umem = queried_umem;
+       return 0;
+}
+
+static int veth_xsk_umem_enable(struct net_device *dev,
+                               struct xdp_umem *umem,
+                               u16 qid)
+{
+       struct veth_priv *priv = netdev_priv(dev);
+       struct xdp_umem_fq_reuse *reuseq;
+       int err = 0;
+
+       if (qid >= dev->real_num_rx_queues)
+               return -EINVAL;
+
+       reuseq = xsk_reuseq_prepare(priv->rq[0].xdp_ring.size);
+       if (!reuseq)
+               return -ENOMEM;
+
+       xsk_reuseq_free(xsk_reuseq_swap(umem, reuseq));
+
+       priv->rq[qid].xsk_umem = umem;
+
+       return err;
+}
+
+static int veth_xsk_umem_disable(struct net_device *dev,
+                                u16 qid)
+{
+       struct veth_priv *priv = netdev_priv(dev);
+       struct xdp_umem *umem;
+
+       umem = xdp_get_umem_from_qid(dev, qid);
+       if (!umem)
+               return -EINVAL;
+
+       priv->rq[qid].xsk_umem = NULL;
+       return 0;
+}
+
+int veth_xsk_umem_setup(struct net_device *dev, struct xdp_umem *umem,
+                       u16 qid)
+{
+       return umem ? veth_xsk_umem_enable(dev, umem, qid) :
+                     veth_xsk_umem_disable(dev, qid);
+}
+
+int veth_xsk_async_xmit(struct net_device *dev, u32 qid)
+{
+       struct veth_priv *priv, *peer_priv;
+       struct net_device *peer_dev;
+       struct veth_rq *peer_rq;
+
+       priv = netdev_priv(dev);
+       peer_dev = priv->peer;
+       peer_priv = netdev_priv(peer_dev);
+       peer_rq = &peer_priv->rq[qid];
+
+       if (qid >= dev->real_num_rx_queues)
+               return -ENXIO;
+
+       /* Schedule the peer side NAPI to receive */
+       if (!napi_if_scheduled_mark_missed(&peer_rq->xdp_napi))
+               napi_schedule(&peer_rq->xdp_napi);
+
+       return 0;
+}
+
  static int veth_xdp(struct net_device *dev, struct netdev_bpf *xdp)
  {
        switch (xdp->command) {
@@ -1123,6 +1298,28 @@ static int veth_xdp(struct net_device *dev, struct 
netdev_bpf *xdp)
        case XDP_QUERY_PROG:
                xdp->prog_id = veth_xdp_query(dev);
                return 0;
+       case XDP_QUERY_XSK_UMEM:
+               return veth_xsk_umem_query(dev, &xdp->xsk.umem,
+                                          xdp->xsk.queue_id);
+       case XDP_SETUP_XSK_UMEM: {
+               struct veth_priv *priv;
+               int err;
+
+               /* Enable NAPI on both sides, by enabling
+                * their XDP.
+                */
+               err = veth_enable_xdp(dev);

Looks like there is no need to enable XDP on this side. You only use peer's NAPI, right?

+               if (err)
+                       return err;
+
+               priv = netdev_priv(dev);
+               err = veth_enable_xdp(priv->peer);

Enabling NAPI here and never disable it?
Also, what happens if peer disables XDP later by detaching an XDP program?
Probably you need something like refcounting.


BTW I'm not 100% sure if current way of accessing peer rq in NAPI handler is safe although I did not find an obvious problem.

Looking into physical NIC drivers, ndo_async_xmit() is expected to kick TX softirq? This should be something like below process in veth.

Say you attach an xsk to veth A, and B is the peer of A.

1. async_xmit kicks A's NAPI
2. A's NAPI drains xsk tx_ring and pushes frames into B's xdp_ring, then kicks B's NAPI 3. B's NAPI drains xdp_ring and process xdp_frames in the same way as normal xdp_frames.

What you are currently doing seems to be skipping 2 and making B's NAPI directly drain tx_ring of xsk bound to A.

I don't have particular opinion about which is better. Probably your approach is more efficient performance-wise? If later you find some race with your approach, please consider more physical-NIC-like approach I described above.

+               if (err)
+                       return err;
+
+               return veth_xsk_umem_setup(dev, xdp->xsk.umem,
+                                          xdp->xsk.queue_id);
+       }
        default:
                return -EINVAL;
        }
@@ -1145,6 +1342,7 @@ static const struct net_device_ops veth_netdev_ops = {
        .ndo_set_rx_headroom    = veth_set_rx_headroom,
        .ndo_bpf                = veth_xdp,
        .ndo_xdp_xmit           = veth_xdp_xmit,
+       .ndo_xsk_async_xmit     = veth_xsk_async_xmit,
  };
#define VETH_FEATURES (NETIF_F_SG | NETIF_F_FRAGLIST | NETIF_F_HW_CSUM | \

Reply via email to