On Wed, Apr 02, 2025 at 03:06:46PM +0000, Alexander Graf wrote:
> Ever since the introduction of the virtio vsock driver, it included
> pushback logic that blocks it from taking any new RX packets until the
> TX queue backlog becomes shallower than the virtqueue size.
> 
> This logic works fine when you connect a user space application on the
> hypervisor with a virtio-vsock target, because the guest will stop
> receiving data until the host pulled all outstanding data from the VM.
> 
> With Nitro Enclaves however, we connect 2 VMs directly via vsock:
> 
>   Parent      Enclave
> 
>     RX -------- TX
>     TX -------- RX
> 
> This means we now have 2 virtio-vsock backends that both have the pushback
> logic. If the parent's TX queue runs full at the same time as the
> Enclave's, both virtio-vsock drivers fall into the pushback path and
> no longer accept RX traffic. However, that RX traffic is TX traffic on
> the other side which blocks that driver from making any forward
> progress. We're now in a deadlock.
> 
> To resolve this, let's remove that pushback logic altogether and rely on
> higher levels (like credits) to ensure we do not consume unbounded
> memory.
> 
> RX and TX queues share the same work queue. To prevent starvation of TX
> by an RX flood and vice versa now that the pushback logic is gone, let's
> deliberately reschedule RX and TX work after a fixed threshold (256) of
> packets to process.
> 
> Fixes: 0ea9e1d3a9e3 ("VSOCK: Introduce virtio_transport.ko")
> Signed-off-by: Alexander Graf <g...@amazon.com>


Acked-by: Michael S. Tsirkin <m...@redhat.com>

> ---
> 
> v1 -> v2:
> 
>   - Rework to use fixed threshold
> 
> v2 -> v3:
> 
>   - Remove superfluous reply variable
> ---
>  net/vmw_vsock/virtio_transport.c | 73 +++++++++-----------------------
>  1 file changed, 19 insertions(+), 54 deletions(-)
> 
> diff --git a/net/vmw_vsock/virtio_transport.c 
> b/net/vmw_vsock/virtio_transport.c
> index f0e48e6911fc..6ae30bf8c85c 100644
> --- a/net/vmw_vsock/virtio_transport.c
> +++ b/net/vmw_vsock/virtio_transport.c
> @@ -26,6 +26,12 @@ static struct virtio_vsock __rcu *the_virtio_vsock;
>  static DEFINE_MUTEX(the_virtio_vsock_mutex); /* protects the_virtio_vsock */
>  static struct virtio_transport virtio_transport; /* forward declaration */
>  
> +/*
> + * Max number of RX packets transferred before requeueing so we do
> + * not starve TX traffic because they share the same work queue.
> + */
> +#define VSOCK_MAX_PKTS_PER_WORK 256
> +
>  struct virtio_vsock {
>       struct virtio_device *vdev;
>       struct virtqueue *vqs[VSOCK_VQ_MAX];
> @@ -44,8 +50,6 @@ struct virtio_vsock {
>       struct work_struct send_pkt_work;
>       struct sk_buff_head send_pkt_queue;
>  
> -     atomic_t queued_replies;
> -
>       /* The following fields are protected by rx_lock.  vqs[VSOCK_VQ_RX]
>        * must be accessed with rx_lock held.
>        */
> @@ -158,7 +162,7 @@ virtio_transport_send_pkt_work(struct work_struct *work)
>               container_of(work, struct virtio_vsock, send_pkt_work);
>       struct virtqueue *vq;
>       bool added = false;
> -     bool restart_rx = false;
> +     int pkts = 0;
>  
>       mutex_lock(&vsock->tx_lock);
>  
> @@ -169,32 +173,24 @@ virtio_transport_send_pkt_work(struct work_struct *work)
>  
>       for (;;) {
>               struct sk_buff *skb;
> -             bool reply;
>               int ret;
>  
> +             if (++pkts > VSOCK_MAX_PKTS_PER_WORK) {
> +                     /* Allow other works on the same queue to run */
> +                     queue_work(virtio_vsock_workqueue, work);
> +                     break;
> +             }
> +
>               skb = virtio_vsock_skb_dequeue(&vsock->send_pkt_queue);
>               if (!skb)
>                       break;
>  
> -             reply = virtio_vsock_skb_reply(skb);
> -
>               ret = virtio_transport_send_skb(skb, vq, vsock, GFP_KERNEL);
>               if (ret < 0) {
>                       virtio_vsock_skb_queue_head(&vsock->send_pkt_queue, 
> skb);
>                       break;
>               }
>  
> -             if (reply) {
> -                     struct virtqueue *rx_vq = vsock->vqs[VSOCK_VQ_RX];
> -                     int val;
> -
> -                     val = atomic_dec_return(&vsock->queued_replies);
> -
> -                     /* Do we now have resources to resume rx processing? */
> -                     if (val + 1 == virtqueue_get_vring_size(rx_vq))
> -                             restart_rx = true;
> -             }
> -
>               added = true;
>       }
>  
> @@ -203,9 +199,6 @@ virtio_transport_send_pkt_work(struct work_struct *work)
>  
>  out:
>       mutex_unlock(&vsock->tx_lock);
> -
> -     if (restart_rx)
> -             queue_work(virtio_vsock_workqueue, &vsock->rx_work);
>  }
>  
>  /* Caller need to hold RCU for vsock.
> @@ -261,9 +254,6 @@ virtio_transport_send_pkt(struct sk_buff *skb)
>        */
>       if (!skb_queue_empty_lockless(&vsock->send_pkt_queue) ||
>           virtio_transport_send_skb_fast_path(vsock, skb)) {
> -             if (virtio_vsock_skb_reply(skb))
> -                     atomic_inc(&vsock->queued_replies);
> -
>               virtio_vsock_skb_queue_tail(&vsock->send_pkt_queue, skb);
>               queue_work(virtio_vsock_workqueue, &vsock->send_pkt_work);
>       }
> @@ -277,7 +267,7 @@ static int
>  virtio_transport_cancel_pkt(struct vsock_sock *vsk)
>  {
>       struct virtio_vsock *vsock;
> -     int cnt = 0, ret;
> +     int ret;
>  
>       rcu_read_lock();
>       vsock = rcu_dereference(the_virtio_vsock);
> @@ -286,17 +276,7 @@ virtio_transport_cancel_pkt(struct vsock_sock *vsk)
>               goto out_rcu;
>       }
>  
> -     cnt = virtio_transport_purge_skbs(vsk, &vsock->send_pkt_queue);
> -
> -     if (cnt) {
> -             struct virtqueue *rx_vq = vsock->vqs[VSOCK_VQ_RX];
> -             int new_cnt;
> -
> -             new_cnt = atomic_sub_return(cnt, &vsock->queued_replies);
> -             if (new_cnt + cnt >= virtqueue_get_vring_size(rx_vq) &&
> -                 new_cnt < virtqueue_get_vring_size(rx_vq))
> -                     queue_work(virtio_vsock_workqueue, &vsock->rx_work);
> -     }
> +     virtio_transport_purge_skbs(vsk, &vsock->send_pkt_queue);
>  
>       ret = 0;
>  
> @@ -367,18 +347,6 @@ static void virtio_transport_tx_work(struct work_struct 
> *work)
>               queue_work(virtio_vsock_workqueue, &vsock->send_pkt_work);
>  }
>  
> -/* Is there space left for replies to rx packets? */
> -static bool virtio_transport_more_replies(struct virtio_vsock *vsock)
> -{
> -     struct virtqueue *vq = vsock->vqs[VSOCK_VQ_RX];
> -     int val;
> -
> -     smp_rmb(); /* paired with atomic_inc() and atomic_dec_return() */
> -     val = atomic_read(&vsock->queued_replies);
> -
> -     return val < virtqueue_get_vring_size(vq);
> -}
> -
>  /* event_lock must be held */
>  static int virtio_vsock_event_fill_one(struct virtio_vsock *vsock,
>                                      struct virtio_vsock_event *event)
> @@ -613,6 +581,7 @@ static void virtio_transport_rx_work(struct work_struct 
> *work)
>       struct virtio_vsock *vsock =
>               container_of(work, struct virtio_vsock, rx_work);
>       struct virtqueue *vq;
> +     int pkts = 0;
>  
>       vq = vsock->vqs[VSOCK_VQ_RX];
>  
> @@ -627,11 +596,9 @@ static void virtio_transport_rx_work(struct work_struct 
> *work)
>                       struct sk_buff *skb;
>                       unsigned int len;
>  
> -                     if (!virtio_transport_more_replies(vsock)) {
> -                             /* Stop rx until the device processes already
> -                              * pending replies.  Leave rx virtqueue
> -                              * callbacks disabled.
> -                              */
> +                     if (++pkts > VSOCK_MAX_PKTS_PER_WORK) {
> +                             /* Allow other works on the same queue to run */
> +                             queue_work(virtio_vsock_workqueue, work);
>                               goto out;
>                       }
>  
> @@ -675,8 +642,6 @@ static int virtio_vsock_vqs_init(struct virtio_vsock 
> *vsock)
>       vsock->rx_buf_max_nr = 0;
>       mutex_unlock(&vsock->rx_lock);
>  
> -     atomic_set(&vsock->queued_replies, 0);
> -
>       ret = virtio_find_vqs(vdev, VSOCK_VQ_MAX, vsock->vqs, vqs_info, NULL);
>       if (ret < 0)
>               return ret;
> -- 
> 2.47.1


Reply via email to