On Thu, Jun 18, 2026 at 5:27 PM Tamir Duberstein <[email protected]> wrote: > > After consuming the last visible record, ringbuf_process_ring() > publishes the consumer position and checks the producer position. These > operations lack a full StoreLoad barrier. A producer can therefore > commit a new record but read the old consumer position while the > consumer reads the old producer position. The producer sends no > notification and the consumer waits despite a queued record. > > Insert a full barrier between publishing a consumer position and the > next producer position load. When a record bound or callback ends the > current invocation first, execute the barrier before returning so the > load in a later invocation completes the same handshake. > > Add an edge-triggered epoll test that drains one record per call while a > concurrent producer fills the ring. Without the barrier, a missed > notification leaves the producer dropping records from a full ring while > the consumer times out. Document that bounded consumers and callbacks > that terminate consumption must drain before waiting again. > > Fixes: bf99c936f947 ("libbpf: Add BPF ring buffer support") > Reported-by: Andrew Werner <[email protected]> > Reported-by: Sashiko <[email protected]> > Closes: > https://lore.kernel.org/bpf/[email protected]/ > Assisted-by: Codex:gpt-5.5 > Signed-off-by: Tamir Duberstein <[email protected]> > --- > tools/lib/bpf/libbpf.h | 23 +++++++ > tools/lib/bpf/ringbuf.c | 24 +++++-- > tools/testing/selftests/bpf/prog_tests/ringbuf.c | 83 > ++++++++++++++++++++++++ > 3 files changed, 123 insertions(+), 7 deletions(-) > > diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h > index ae46b17feaa6..3a649ed87034 100644 > --- a/tools/lib/bpf/libbpf.h > +++ b/tools/lib/bpf/libbpf.h > @@ -1440,6 +1440,11 @@ struct ring; > struct user_ring_buffer; > > /* Callback-based consumption is unsupported for BPF_F_RB_OVERWRITE maps. */ > +/* > + * A negative return stops consumption; non-negative values continue. > Stopping > + * can leave records queued without a new readiness notification. Before > + * waiting for readiness again, consume until no records remain. > + */ > typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size); > > struct ring_buffer_opts { > @@ -1456,6 +1461,20 @@ LIBBPF_API int ring_buffer__add(struct ring_buffer > *rb, int map_fd, > ring_buffer_sample_fn sample_cb, void *ctx); > LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms); > LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb); > + > +/** > + * @brief **ring_buffer__consume_n()** consumes up to a requested number of > + * records from a ring buffer manager without event polling. > + * > + * @param rb A ring buffer manager object. > + * @param n Maximum number of records to consume. > + * @return The number of records consumed, or a negative error code on > failure. > + * > + * Reaching the requested bound does not establish that every ring is empty. > + * Records can remain queued without a new readiness notification. Before > + * calling ring_buffer__poll() or waiting on ring_buffer__epoll_fd(), call > + * ring_buffer__consume() until it returns 0. > + */ > LIBBPF_API int ring_buffer__consume_n(struct ring_buffer *rb, size_t n); > LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb); > > @@ -1538,6 +1557,10 @@ LIBBPF_API int ring__consume(struct ring *r); > * @param r A ringbuffer object. > * @param n Maximum number of records to consume. > * @return The number of records consumed, or a negative error code on > failure. > + * > + * Reaching the requested bound does not establish that the ring is empty. > + * Records can remain queued without a new readiness notification. Before > + * waiting on ring__map_fd(), call ring__consume() until it returns 0. > */ > LIBBPF_API int ring__consume_n(struct ring *r, size_t n); > > diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c > index 141f2cbe56eb..0598f6c2f7da 100644 > --- a/tools/lib/bpf/ringbuf.c > +++ b/tools/lib/bpf/ringbuf.c > @@ -271,7 +271,7 @@ static int64_t ringbuf_process_ring(struct ring *r, > size_t n) > return 0; > > cons_pos = __atomic_load_n(r->consumer_pos, __ATOMIC_ACQUIRE); > - do { > + for (;;) { > got_new_data = false; > prod_pos = __atomic_load_n(r->producer_pos, __ATOMIC_ACQUIRE); > /* Positions wrap; the consumer cannot logically pass the > producer. */ > @@ -279,9 +279,9 @@ static int64_t ringbuf_process_ring(struct ring *r, > size_t n) > len_ptr = r->data + (cons_pos & r->mask); > len = __atomic_load_n(len_ptr, __ATOMIC_ACQUIRE); > > - /* sample not committed yet, bail out for now */ > + /* Retry a busy record once after publishing prior > records. */ > if (len & BPF_RINGBUF_BUSY_BIT) > - goto done; > + break;
so we do this one time retry just so that we can reuse the same memory barrier, is that right? and similarly all those error-returning conditions with cnt = err. Wouldn't it be cleaner and easier to mentally follow to have `goto done` which would just duplicate memory barrier and exit with whatever value err is? > > got_new_data = true; > cons_pos += roundup_len(len); [...]

