On Wed, Apr 02, 2025 at 12:14:24PM -0400, Stefan Hajnoczi wrote: > On Tue, Apr 01, 2025 at 08:13:49PM +0000, Alexander Graf wrote: > > Ever since the introduction of the virtio vsock driver, it included > > pushback logic that blocks it from taking any new RX packets until the > > TX queue backlog becomes shallower than the virtqueue size. > > > > This logic works fine when you connect a user space application on the > > hypervisor with a virtio-vsock target, because the guest will stop > > receiving data until the host pulled all outstanding data from the VM. > > > > With Nitro Enclaves however, we connect 2 VMs directly via vsock: > > > > Parent Enclave > > > > RX -------- TX > > TX -------- RX > > > > This means we now have 2 virtio-vsock backends that both have the pushback > > logic. If the parent's TX queue runs full at the same time as the > > Enclave's, both virtio-vsock drivers fall into the pushback path and > > no longer accept RX traffic. However, that RX traffic is TX traffic on > > the other side which blocks that driver from making any forward > > progress. We're now in a deadlock. > > > > To resolve this, let's remove that pushback logic altogether and rely on > > higher levels (like credits) to ensure we do not consume unbounded > > memory. > > The reason for queued_replies is that rx packet processing may emit tx > packets. Therefore tx virtqueue space is required in order to process > the rx virtqueue. > > queued_replies puts a bound on the amount of tx packets that can be > queued in memory so the other side cannot consume unlimited memory. Once > that bound has been reached, rx processing stops until the other side > frees up tx virtqueue space. > > It's been a while since I looked at this problem, so I don't have a > solution ready. In fact, last time I thought about it I wondered if the > design of virtio-vsock fundamentally suffers from deadlocks. > > I don't think removing queued_replies is possible without a replacement > for the bounded memory and virtqueue exhaustion issue though. Credits > are not a solution - they are about socket buffer space, not about > virtqueue space, which includes control packets that are not accounted > by socket buffer space.
Hmm. Actually, let's think which packets require a response. VIRTIO_VSOCK_OP_REQUEST VIRTIO_VSOCK_OP_SHUTDOWN VIRTIO_VSOCK_OP_CREDIT_REQUEST the response to these always reports a state of an existing socket. and, only one type of response is relevant for each socket. So here's my suggestion: stop queueing replies on the vsock device, instead, simply store the response on the socket, and create a list of sockets that have replies to be transmitted WDYT? > > > > RX and TX queues share the same work queue. To prevent starvation of TX > > by an RX flood and vice versa now that the pushback logic is gone, let's > > deliberately reschedule RX and TX work after a fixed threshold (256) of > > packets to process. > > > > Fixes: 0ea9e1d3a9e3 ("VSOCK: Introduce virtio_transport.ko") > > Signed-off-by: Alexander Graf <g...@amazon.com> > > --- > > net/vmw_vsock/virtio_transport.c | 70 +++++++++----------------------- > > 1 file changed, 19 insertions(+), 51 deletions(-) > > > > diff --git a/net/vmw_vsock/virtio_transport.c > > b/net/vmw_vsock/virtio_transport.c > > index f0e48e6911fc..54030c729767 100644 > > --- a/net/vmw_vsock/virtio_transport.c > > +++ b/net/vmw_vsock/virtio_transport.c > > @@ -26,6 +26,12 @@ static struct virtio_vsock __rcu *the_virtio_vsock; > > static DEFINE_MUTEX(the_virtio_vsock_mutex); /* protects the_virtio_vsock > > */ > > static struct virtio_transport virtio_transport; /* forward declaration */ > > > > +/* > > + * Max number of RX packets transferred before requeueing so we do > > + * not starve TX traffic because they share the same work queue. > > + */ > > +#define VSOCK_MAX_PKTS_PER_WORK 256 > > + > > struct virtio_vsock { > > struct virtio_device *vdev; > > struct virtqueue *vqs[VSOCK_VQ_MAX]; > > @@ -44,8 +50,6 @@ struct virtio_vsock { > > struct work_struct send_pkt_work; > > struct sk_buff_head send_pkt_queue; > > > > - atomic_t queued_replies; > > - > > /* The following fields are protected by rx_lock. vqs[VSOCK_VQ_RX] > > * must be accessed with rx_lock held. > > */ > > @@ -158,7 +162,7 @@ virtio_transport_send_pkt_work(struct work_struct *work) > > container_of(work, struct virtio_vsock, send_pkt_work); > > struct virtqueue *vq; > > bool added = false; > > - bool restart_rx = false; > > + int pkts = 0; > > > > mutex_lock(&vsock->tx_lock); > > > > @@ -172,6 +176,12 @@ virtio_transport_send_pkt_work(struct work_struct > > *work) > > bool reply; > > int ret; > > > > + if (++pkts > VSOCK_MAX_PKTS_PER_WORK) { > > + /* Allow other works on the same queue to run */ > > + queue_work(virtio_vsock_workqueue, work); > > + break; > > + } > > + > > skb = virtio_vsock_skb_dequeue(&vsock->send_pkt_queue); > > if (!skb) > > break; > > @@ -184,17 +194,6 @@ virtio_transport_send_pkt_work(struct work_struct > > *work) > > break; > > } > > > > - if (reply) { > > - struct virtqueue *rx_vq = vsock->vqs[VSOCK_VQ_RX]; > > - int val; > > - > > - val = atomic_dec_return(&vsock->queued_replies); > > - > > - /* Do we now have resources to resume rx processing? */ > > - if (val + 1 == virtqueue_get_vring_size(rx_vq)) > > - restart_rx = true; > > - } > > - > > added = true; > > } > > > > @@ -203,9 +202,6 @@ virtio_transport_send_pkt_work(struct work_struct *work) > > > > out: > > mutex_unlock(&vsock->tx_lock); > > - > > - if (restart_rx) > > - queue_work(virtio_vsock_workqueue, &vsock->rx_work); > > } > > > > /* Caller need to hold RCU for vsock. > > @@ -261,9 +257,6 @@ virtio_transport_send_pkt(struct sk_buff *skb) > > */ > > if (!skb_queue_empty_lockless(&vsock->send_pkt_queue) || > > virtio_transport_send_skb_fast_path(vsock, skb)) { > > - if (virtio_vsock_skb_reply(skb)) > > - atomic_inc(&vsock->queued_replies); > > - > > virtio_vsock_skb_queue_tail(&vsock->send_pkt_queue, skb); > > queue_work(virtio_vsock_workqueue, &vsock->send_pkt_work); > > } > > @@ -277,7 +270,7 @@ static int > > virtio_transport_cancel_pkt(struct vsock_sock *vsk) > > { > > struct virtio_vsock *vsock; > > - int cnt = 0, ret; > > + int ret; > > > > rcu_read_lock(); > > vsock = rcu_dereference(the_virtio_vsock); > > @@ -286,17 +279,7 @@ virtio_transport_cancel_pkt(struct vsock_sock *vsk) > > goto out_rcu; > > } > > > > - cnt = virtio_transport_purge_skbs(vsk, &vsock->send_pkt_queue); > > - > > - if (cnt) { > > - struct virtqueue *rx_vq = vsock->vqs[VSOCK_VQ_RX]; > > - int new_cnt; > > - > > - new_cnt = atomic_sub_return(cnt, &vsock->queued_replies); > > - if (new_cnt + cnt >= virtqueue_get_vring_size(rx_vq) && > > - new_cnt < virtqueue_get_vring_size(rx_vq)) > > - queue_work(virtio_vsock_workqueue, &vsock->rx_work); > > - } > > + virtio_transport_purge_skbs(vsk, &vsock->send_pkt_queue); > > > > ret = 0; > > > > @@ -367,18 +350,6 @@ static void virtio_transport_tx_work(struct > > work_struct *work) > > queue_work(virtio_vsock_workqueue, &vsock->send_pkt_work); > > } > > > > -/* Is there space left for replies to rx packets? */ > > -static bool virtio_transport_more_replies(struct virtio_vsock *vsock) > > -{ > > - struct virtqueue *vq = vsock->vqs[VSOCK_VQ_RX]; > > - int val; > > - > > - smp_rmb(); /* paired with atomic_inc() and atomic_dec_return() */ > > - val = atomic_read(&vsock->queued_replies); > > - > > - return val < virtqueue_get_vring_size(vq); > > -} > > - > > /* event_lock must be held */ > > static int virtio_vsock_event_fill_one(struct virtio_vsock *vsock, > > struct virtio_vsock_event *event) > > @@ -613,6 +584,7 @@ static void virtio_transport_rx_work(struct work_struct > > *work) > > struct virtio_vsock *vsock = > > container_of(work, struct virtio_vsock, rx_work); > > struct virtqueue *vq; > > + int pkts = 0; > > > > vq = vsock->vqs[VSOCK_VQ_RX]; > > > > @@ -627,11 +599,9 @@ static void virtio_transport_rx_work(struct > > work_struct *work) > > struct sk_buff *skb; > > unsigned int len; > > > > - if (!virtio_transport_more_replies(vsock)) { > > - /* Stop rx until the device processes already > > - * pending replies. Leave rx virtqueue > > - * callbacks disabled. > > - */ > > + if (++pkts > VSOCK_MAX_PKTS_PER_WORK) { > > + /* Allow other works on the same queue to run */ > > + queue_work(virtio_vsock_workqueue, work); > > goto out; > > } > > > > @@ -675,8 +645,6 @@ static int virtio_vsock_vqs_init(struct virtio_vsock > > *vsock) > > vsock->rx_buf_max_nr = 0; > > mutex_unlock(&vsock->rx_lock); > > > > - atomic_set(&vsock->queued_replies, 0); > > - > > ret = virtio_find_vqs(vdev, VSOCK_VQ_MAX, vsock->vqs, vqs_info, NULL); > > if (ret < 0) > > return ret; > > -- > > 2.47.1 > >