From: Stefano Garzarella <[email protected]>

When many small packets accumulate in the receive queue, the skb overhead
can exceed buf_alloc even while the payload is within bounds. This causes
virtio_transport_inc_rx_pkt() to reject packets, leading to connection
resets during large transfers under backpressure.

The issue was reported by Brien, who has a reproducer, but it is also
easily reproducible with iperf-vsock [1] using a small packet size:

  iperf3 --vsock -c $CID -l 129

which fails immediately without this patch but with commit 059b7dbd20a6
("vsock/virtio: fix potential unbounded skb queue").

Inspired by TCP's tcp_collapse() which solves a similar problem, add
virtio_transport_collapse_rx_queue() that walks the receive queue and
re-copies data into compact linear skbs to reduce the overhead.

The collapse is triggered from virtio_transport_recv_enqueue() when
virtio_transport_inc_rx_pkt() fails. A pre-scan counts the eligible bytes
to size each allocation precisely, avoiding waste for isolated small
packets. Partially consumed skbs are kept as-is to preserve
buf_used/fwd_cnt accounting, EOM-marked skbs to maintain SEQPACKET
message boundaries, and skbs already larger than the collapse target
because they already have a good data-to-overhead ratio.

[1] https://github.com/stefano-garzarella/iperf-vsock

Fixes: 059b7dbd20a6 ("vsock/virtio: fix potential unbounded skb queue")
Cc: [email protected]
Reported-by: Brien Oberstein <[email protected]>
Closes: 
https://lore.kernel.org/netdev/[email protected]/
Tested-by: Brien Oberstein <[email protected]>
Signed-off-by: Stefano Garzarella <[email protected]>
---
 net/vmw_vsock/virtio_transport_common.c | 148 +++++++++++++++++++++++-
 1 file changed, 146 insertions(+), 2 deletions(-)

diff --git a/net/vmw_vsock/virtio_transport_common.c 
b/net/vmw_vsock/virtio_transport_common.c
index 09475007165b..304ea424995d 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -420,6 +420,137 @@ static int virtio_transport_send_pkt_info(struct 
vsock_sock *vsk,
        return ret;
 }
 
+static bool virtio_transport_can_collapse(struct sk_buff *skb,
+                                         unsigned int size)
+{
+       /* skbs that are partially consumed, mark a SEQPACKET message boundary,
+        * or are already large enough should not be collapsed: they either
+        * need special accounting, carry protocol state, or already have a
+        * good data-to-overhead ratio.
+        */
+       if (VIRTIO_VSOCK_SKB_CB(skb)->offset)
+               return false;
+       if (le32_to_cpu(virtio_vsock_hdr(skb)->flags) & VIRTIO_VSOCK_SEQ_EOM)
+               return false;
+       if (skb->len >= size)
+               return false;
+       return true;
+}
+
+/* Iterate through the packets in the queue starting from the current skb to
+ * count the number of bytes we can collapse.
+ */
+static unsigned int
+virtio_transport_collapse_size(struct sk_buff *skb,
+                              struct sk_buff_head *queue,
+                              unsigned int max_size)
+{
+       unsigned int target = skb->len - VIRTIO_VSOCK_SKB_CB(skb)->offset;
+
+       while ((skb = skb_peek_next(skb, queue)) &&
+              virtio_transport_can_collapse(skb, max_size)) {
+               unsigned int len = skb->len - VIRTIO_VSOCK_SKB_CB(skb)->offset;
+
+               if (len > max_size - target)
+                       return target;
+
+               target += len;
+       }
+
+       return target;
+}
+
+/* Called under lock_sock when skb overhead exceeds the budget. */
+static void virtio_transport_collapse_rx_queue(struct virtio_vsock_sock *vvs)
+{
+       /* Use the same linear allocation threshold as virtio_vsock_alloc_skb()
+        * to avoid adding pressure on the page allocator.
+        */
+       unsigned int collapse_max = SKB_MAX_ORDER(VIRTIO_VSOCK_SKB_HEADROOM,
+                                                 PAGE_ALLOC_COSTLY_ORDER);
+       struct sk_buff *skb, *next_skb, *new_skb = NULL;
+       struct sk_buff_head new_queue;
+
+       __skb_queue_head_init(&new_queue);
+
+       skb_queue_walk_safe(&vvs->rx_queue, skb, next_skb) {
+               struct virtio_vsock_hdr *hdr = virtio_vsock_hdr(skb);
+               u32 src_off = VIRTIO_VSOCK_SKB_CB(skb)->offset;
+               u32 src_len = skb->len - src_off;
+               bool keep = false;
+
+               if (!virtio_transport_can_collapse(skb, collapse_max)) {
+                       /* Finalize pending collapsed skb to preserve packet
+                        * ordering.
+                        */
+                       if (new_skb) {
+                               __skb_queue_tail(&new_queue, new_skb);
+                               new_skb = NULL;
+                       }
+                       keep = true;
+                       goto next;
+               }
+
+               /* Finalize if this packet won't fit in the remaining tailroom,
+                * so we can allocate a right-sized new_skb.
+                */
+               if (new_skb && src_len > skb_tailroom(new_skb)) {
+                       __skb_queue_tail(&new_queue, new_skb);
+                       new_skb = NULL;
+               }
+
+               if (!new_skb) {
+                       unsigned int alloc_size;
+
+                       alloc_size = virtio_transport_collapse_size(skb, 
&vvs->rx_queue,
+                                                                   
collapse_max);
+
+                       /* Only this skb's data is eligible, nothing to merge
+                        * with. Keep as-is.
+                        */
+                       if (alloc_size <= src_len) {
+                               keep = true;
+                               goto next;
+                       }
+
+                       new_skb = virtio_vsock_alloc_linear_skb(alloc_size +
+                                       VIRTIO_VSOCK_SKB_HEADROOM, GFP_KERNEL);
+                       if (!new_skb)
+                               goto out;
+
+                       memcpy(virtio_vsock_hdr(new_skb), hdr,
+                              sizeof(struct virtio_vsock_hdr));
+                       virtio_vsock_hdr(new_skb)->len = 0;
+               }
+
+               /* Cannot fail since src_off/src_len are within bounds, but if
+                * it does, discard new_skb to avoid queuing corrupted data.
+                */
+               if (WARN_ON_ONCE(skb_copy_bits(skb, src_off,
+                                              skb_put(new_skb, src_len),
+                                              src_len))) {
+                       kfree_skb(new_skb);
+                       new_skb = NULL;
+                       goto out;
+               }
+
+               le32_add_cpu(&virtio_vsock_hdr(new_skb)->len, src_len);
+               virtio_vsock_hdr(new_skb)->flags |= hdr->flags;
+
+next:
+               __skb_unlink(skb, &vvs->rx_queue);
+               if (keep)
+                       __skb_queue_tail(&new_queue, skb);
+               else
+                       consume_skb(skb);
+       }
+out:
+       if (new_skb)
+               __skb_queue_tail(&new_queue, new_skb);
+
+       skb_queue_splice(&new_queue, &vvs->rx_queue);
+}
+
 static bool virtio_transport_inc_rx_pkt(struct virtio_vsock_sock *vvs,
                                        u32 len)
 {
@@ -1363,8 +1494,21 @@ virtio_transport_recv_enqueue(struct vsock_sock *vsk,
        spin_lock_bh(&vvs->rx_lock);
 
        can_enqueue = virtio_transport_inc_rx_pkt(vvs, len);
-       if (!can_enqueue)
-               goto out;
+       if (!can_enqueue) {
+               /* Try to collapse the receive queue to reduce skb overhead and
+                * make room for this packet.
+                * Unlock rx_lock since the collapse may sleep or, in any case,
+                * take some time to collapse the skbs, but this is safe, since
+                * sk_lock is held by caller so no one else can enqueue or
+                * dequeue.
+                */
+               spin_unlock_bh(&vvs->rx_lock);
+               virtio_transport_collapse_rx_queue(vvs);
+               spin_lock_bh(&vvs->rx_lock);
+               can_enqueue = virtio_transport_inc_rx_pkt(vvs, len);
+               if (!can_enqueue)
+                       goto out;
+       }
 
        if (le32_to_cpu(hdr->flags) & VIRTIO_VSOCK_SEQ_EOM)
                vvs->msg_count++;
-- 
2.54.0


Reply via email to