Introduce tun_ring_consume() that wraps ptr_ring_consume() and calls
__tun_wake_queue(). The latter wakes the stopped netdev subqueue once
half of the ring capacity has been consumed, tracked via the new
cons_cnt field in tun_file. cons_cnt is updated while holding the ring
consumer lock, avoiding races. As a safety net, the queue is also woken
when the ring becomes empty. The point is to allow the queue to be
stopped when it gets full, which is required for traffic shaping -
implemented by the following "avoid ptr_ring tail-drop when a qdisc
is present". That patch also explains the pairing of the smp_mb()
of __tun_wake_queue().

Without the corresponding queue stopping, this patch alone causes no
regression for a tap setup sending to a qemu VM: 1.132 Mpps
to 1.144 Mpps.

Details: AMD Ryzen 5 5600X at 4.3 GHz, 3200 MHz RAM, isolated QEMU
threads, pktgen sender; Avg over 50 runs @ 100,000,000 packets;
SRSO and spectre v2 mitigations disabled.

Co-developed-by: Tim Gebauer <[email protected]>
Signed-off-by: Tim Gebauer <[email protected]>
Signed-off-by: Simon Schippers <[email protected]>
---
 drivers/net/tun.c | 54 +++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 50 insertions(+), 4 deletions(-)

diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index b183189f1853..00ecf128fe8e 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -145,6 +145,7 @@ struct tun_file {
        struct list_head next;
        struct tun_struct *detached;
        struct ptr_ring tx_ring;
+       int cons_cnt;
        struct xdp_rxq_info xdp_rxq;
 };
 
@@ -557,6 +558,13 @@ void tun_ptr_free(void *ptr)
 }
 EXPORT_SYMBOL_GPL(tun_ptr_free);
 
+static void tun_reset_cons_cnt(struct tun_file *tfile)
+{
+       spin_lock(&tfile->tx_ring.consumer_lock);
+       tfile->cons_cnt = 0;
+       spin_unlock(&tfile->tx_ring.consumer_lock);
+}
+
 static void tun_queue_purge(struct tun_file *tfile)
 {
        void *ptr;
@@ -564,6 +572,7 @@ static void tun_queue_purge(struct tun_file *tfile)
        while ((ptr = ptr_ring_consume(&tfile->tx_ring)) != NULL)
                tun_ptr_free(ptr);
 
+       tun_reset_cons_cnt(tfile);
        skb_queue_purge(&tfile->sk.sk_write_queue);
        skb_queue_purge(&tfile->sk.sk_error_queue);
 }
@@ -730,6 +739,7 @@ static int tun_attach(struct tun_struct *tun, struct file 
*file,
                goto out;
        }
 
+       tun_reset_cons_cnt(tfile);
        tfile->queue_index = tun->numqueues;
        tfile->socket.sk->sk_shutdown &= ~RCV_SHUTDOWN;
 
@@ -2115,13 +2125,46 @@ static ssize_t tun_put_user(struct tun_struct *tun,
        return total;
 }
 
-static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
+/* Callers must hold ring.consumer_lock */
+static void __tun_wake_queue(struct tun_struct *tun,
+                            struct tun_file *tfile, int consumed)
+{
+       struct netdev_queue *txq = netdev_get_tx_queue(tun->dev,
+                                               tfile->queue_index);
+
+       /* Paired with smp_mb__after_atomic() in tun_net_xmit() */
+       smp_mb();
+       if (netif_tx_queue_stopped(txq)) {
+               tfile->cons_cnt += consumed;
+               if (tfile->cons_cnt >= tfile->tx_ring.size / 2 ||
+                   __ptr_ring_empty(&tfile->tx_ring)) {
+                       netif_tx_wake_queue(txq);
+                       tfile->cons_cnt = 0;
+               }
+       }
+}
+
+static void *tun_ring_consume(struct tun_struct *tun, struct tun_file *tfile)
+{
+       void *ptr;
+
+       spin_lock(&tfile->tx_ring.consumer_lock);
+       ptr = __ptr_ring_consume(&tfile->tx_ring);
+       if (ptr)
+               __tun_wake_queue(tun, tfile, 1);
+
+       spin_unlock(&tfile->tx_ring.consumer_lock);
+       return ptr;
+}
+
+static void *tun_ring_recv(struct tun_struct *tun, struct tun_file *tfile,
+                          int noblock, int *err)
 {
        DECLARE_WAITQUEUE(wait, current);
        void *ptr = NULL;
        int error = 0;
 
-       ptr = ptr_ring_consume(&tfile->tx_ring);
+       ptr = tun_ring_consume(tun, tfile);
        if (ptr)
                goto out;
        if (noblock) {
@@ -2133,7 +2176,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int 
noblock, int *err)
 
        while (1) {
                set_current_state(TASK_INTERRUPTIBLE);
-               ptr = ptr_ring_consume(&tfile->tx_ring);
+               ptr = tun_ring_consume(tun, tfile);
                if (ptr)
                        break;
                if (signal_pending(current)) {
@@ -2170,7 +2213,7 @@ static ssize_t tun_do_read(struct tun_struct *tun, struct 
tun_file *tfile,
 
        if (!ptr) {
                /* Read frames from ring */
-               ptr = tun_ring_recv(tfile, noblock, &err);
+               ptr = tun_ring_recv(tun, tfile, noblock, &err);
                if (!ptr)
                        return err;
        }
@@ -3406,6 +3449,8 @@ static int tun_chr_open(struct inode *inode, struct file 
* file)
                return -ENOMEM;
        }
 
+       tun_reset_cons_cnt(tfile);
+
        mutex_init(&tfile->napi_mutex);
        RCU_INIT_POINTER(tfile->tun, NULL);
        tfile->flags = 0;
@@ -3614,6 +3659,7 @@ static int tun_queue_resize(struct tun_struct *tun)
        for (i = 0; i < tun->numqueues; i++) {
                tfile = rtnl_dereference(tun->tfiles[i]);
                rings[i] = &tfile->tx_ring;
+               tun_reset_cons_cnt(tfile);
        }
        list_for_each_entry(tfile, &tun->disabled, next)
                rings[i++] = &tfile->tx_ring;
-- 
2.43.0


Reply via email to