Hi Anton, Thanks for taking a look! Also thanks for the other explanation.
On Thu, 2024-07-04 at 08:21 +0100, anton.iva...@cambridgegreys.com wrote: > From: Anton Ivanov <anton.iva...@cambridgegreys.com> > > UML vector drivers use ring buffer structures which map > preallocated skbs onto mmsg vectors for use with sendmmsg > and recvmmsg. They are designed around a single consumer, > single producer pattern allowing simultaneous enqueue and > dequeue. Right, I didn't grok that from just the code yesterday :) > Lock debugging with preemption showed possible races when > locking the queue depth. This patch addresses this by > removing extra locks while making queue depth inc/dec and > access atomic. I don't think this is fully correct in SMP, and I think we should fix it even if we don't have SMP (yet?) See, the other thing about spinlocks is that they serve as barriers, and I don't think the atomics you use have "enough" of them (per the docs in atomic_t.txt)? If you have concurrent producers and consumers, you really want the producer to actually have fully prepared the data before it's visible to the consumer, which means you need to have a full barrier before incrementing queue_length. But you're now incrementing it by just atomic_add(), and atomic_t.txt says: - RMW operations that have no return value are unordered; So I think the producer should have /* * adding to queue_length makes the prepared buffers * visible to the consumer, ensure they're actually * completely written/visible before */ smp_mb__before_atomic(); atomic_add(...); Or so? > + atomic_sub(advance, &qi->queue_depth); > > - if (qi->queue_depth == 0) { > - qi->head = 0; > - qi->tail = 0; > - } > - queue_depth = qi->queue_depth; > - spin_unlock(&qi->tail_lock); > - return queue_depth; > + return atomic_read(&qi->queue_depth); Or just use atomic_sub_return()? Though that also implies a barrier, which I think isn't needed on the consumer side. However I think it's clearer to just remove the return value and make this function void. > static int vector_advancetail(struct vector_queue *qi, int advance) > { > - int queue_depth; > - > qi->tail = > (qi->tail + advance) > % qi->max_depth; > - spin_lock(&qi->head_lock); > - qi->queue_depth += advance; > - queue_depth = qi->queue_depth; > - spin_unlock(&qi->head_lock); > - return queue_depth; > + atomic_add(advance, &qi->queue_depth); > + return atomic_read(&qi->queue_depth); > } Or maybe here instead of the barrier use atomic_add_return() which implies a full barrier on both sides, but I don't think you need that, the barrier before is enough? > @@ -411,61 +386,58 @@ static int vector_send(struct vector_queue *qi) > int result = 0, send_len, queue_depth = qi->max_depth; > > if (spin_trylock(&qi->head_lock)) { [...] > + /* update queue_depth to current value */ > + queue_depth = atomic_read(&qi->queue_depth); > + while (queue_depth > 0) { I think it'd be clearer to write this as while ((queue_depth = atomic_read(...))) { and simply not modify the queue_depth to the return value of consume_vector_skbs() here: [...] > + if (result > 0) { > + queue_depth = > + consume_vector_skbs(qi, result); > + /* This is equivalent to an TX IRQ. > + * Restart the upper layers to feed us > + * more packets. > */ > - if (result != send_len) { > - vp->estats.tx_restart_queue++; > - break; > - } > + if (result > vp->estats.tx_queue_max) > + vp->estats.tx_queue_max = result; > + vp->estats.tx_queue_running_average = > + (vp->estats.tx_queue_running_average + > result) >> 1; > + } > + netif_wake_queue(qi->dev); > + /* if TX is busy, break out of the send loop, > + * poll write IRQ will reschedule xmit for us > + */ > + if (result != send_len) { > + vp->estats.tx_restart_queue++; > + break; > } > } The loop doesn't need queue_depth until it goes back to the beginning anyway. (it probably also never even executes twice unless you actually have SMP or preemption?) > @@ -675,11 +647,13 @@ static void prep_queue_for_rx(struct vector_queue *qi) > struct vector_private *vp = netdev_priv(qi->dev); > struct mmsghdr *mmsg_vector = qi->mmsg_vector; > void **skbuff_vector = qi->skbuff_vector; > - int i; > + int i, queue_depth; > + > + queue_depth = atomic_read(&qi->queue_depth); > > - if (qi->queue_depth == 0) > + if (queue_depth == 0) > return; > - for (i = 0; i < qi->queue_depth; i++) { > + for (i = 0; i < queue_depth; i++) { > /* it is OK if allocation fails - recvmmsg with NULL data in > * iov argument still performs an RX, just drops the packet > * This allows us stop faffing around with a "drop buffer" > @@ -689,7 +663,7 @@ static void prep_queue_for_rx(struct vector_queue *qi) > skbuff_vector++; > mmsg_vector++; > } > - qi->queue_depth = 0; > + atomic_set(&qi->queue_depth, 0); atomic_read() and then atomic_set() rather than atomic_sub() seems questionable, but you didn't have locks here before so maybe this is somehow logically never in parallel? But it is done in NAPI polling via vector_mmsg_rx(), so not sure I understand. > @@ -987,7 +961,7 @@ static int vector_mmsg_rx(struct vector_private *vp, int > budget) > * many do we need to prep the next time prep_queue_for_rx() is called. > */ > > - qi->queue_depth = packet_count; > + atomic_add(packet_count, &qi->queue_depth); > > for (i = 0; i < packet_count; i++) { > skb = (*skbuff_vector); especially since here you _did_ convert to atomic_add(). johannes