On Sat Jun 13, 2026 at 9:48 PM EDT, Tamir Duberstein wrote:
> After consuming the last visible record, ringbuf_process_ring()
> publishes the consumer position and checks the producer position. These
> operations lack a full StoreLoad barrier. A producer can therefore
> commit a new record but read the old consumer position while the
> consumer reads the old producer position. The producer sends no
> notification and the consumer waits despite a queued record.
>
> Insert a full barrier before checking for new data, ensuring that either
> the consumer observes the producer update or the producer observes the
> consumer update and sends a notification. Apply the same handshake when
> a busy record follows records whose consumer position was published.
>
> Add an edge-triggered epoll test with a concurrent producer. Without the
> barrier, a missed notification leaves the producer dropping records from
> a full ring while the consumer times out. Document that bounded
> consumers and callbacks that terminate consumption must drain before
> waiting again.
>
> Fixes: bf99c936f947 ("libbpf: Add BPF ring buffer support")
> Reported-by: Andrew Werner <[email protected]>
> Assisted-by: Codex:gpt-5.5
> Signed-off-by: Tamir Duberstein <[email protected]>
> ---
>  tools/lib/bpf/libbpf.h                           | 22 +++++++
>  tools/lib/bpf/ringbuf.c                          | 14 +++-
>  tools/testing/selftests/bpf/prog_tests/ringbuf.c | 84 
> ++++++++++++++++++++++++
>  3 files changed, 117 insertions(+), 3 deletions(-)
>
> diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h
> index 9ba6b9ad3498..a3b8f606a91d 100644
> --- a/tools/lib/bpf/libbpf.h
> +++ b/tools/lib/bpf/libbpf.h
> @@ -1439,6 +1439,10 @@ struct ring_buffer;
>  struct ring;
>  struct user_ring_buffer;
>  
> +/* A negative return stops consumption; non-negative values continue. 
> Stopping

Sashiko is right on this, let's use regular kernel style for new
comments.

> + * can leave records queued without a new readiness notification. Before
> + * waiting for readiness again, consume until no records remain.
> + */
>  typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size);
>  
>  struct ring_buffer_opts {
> @@ -1455,6 +1459,20 @@ LIBBPF_API int ring_buffer__add(struct ring_buffer 
> *rb, int map_fd,
>                               ring_buffer_sample_fn sample_cb, void *ctx);
>  LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);
>  LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb);
> +
> +/**
> + * @brief **ring_buffer__consume_n()** consumes up to a requested number of
> + * records from a ring buffer manager without event polling.
> + *
> + * @param rb A ring buffer manager object.
> + * @param n Maximum number of records to consume.
> + * @return The number of records consumed, or a negative error code on 
> failure.
> + *
> + * Reaching the requested bound does not establish that every ring is empty.
> + * Records can remain queued without a new readiness notification. Before
> + * waiting on ring_buffer__epoll_fd(), call ring_buffer__consume() until it
> + * returns 0.
> + */
>  LIBBPF_API int ring_buffer__consume_n(struct ring_buffer *rb, size_t n);
>  LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb);
>  
> @@ -1537,6 +1555,10 @@ LIBBPF_API int ring__consume(struct ring *r);
>   * @param r A ringbuffer object.
>   * @param n Maximum number of records to consume.
>   * @return The number of records consumed, or a negative error code on 
> failure.
> + *
> + * Reaching the requested bound does not establish that the ring is empty.
> + * Records can remain queued without a new readiness notification. Before
> + * waiting on ring__map_fd(), call ring__consume() until it returns 0.
>   */
>  LIBBPF_API int ring__consume_n(struct ring *r, size_t n);
>  
> diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
> index 1c24a83f59d5..ea8909fec4e9 100644
> --- a/tools/lib/bpf/ringbuf.c
> +++ b/tools/lib/bpf/ringbuf.c
> @@ -255,7 +255,7 @@ static int64_t ringbuf_process_ring(struct ring *r, 
> size_t n)
>       /* 64-bit to avoid overflow in case of extreme application behavior */
>       int64_t cnt = 0;
>       unsigned long cons_pos, prod_pos;
> -     bool got_new_data;
> +     bool got_new_data, needs_wakeup = false;

The extra new variable feels overly complicated. I think this is becuase
the new code is going off of the got_new_data pattern, which imo is also
unnecessary. We should just remove got_new data and just do:

while (true) {
        prod_pos = __atomic_load_n(r->producer_pos, __ATOMIC_ACQUIRE);
        if (cons_pos == prod_pos)
                break;

        while (cons_pos != prod_pos) {
                ...
        }
}

>       void *sample;
>  
>       err = ringbuf_validate(r);
> @@ -267,14 +267,21 @@ static int64_t ringbuf_process_ring(struct ring *r, 
> size_t n)
>       cons_pos = __atomic_load_n(r->consumer_pos, __ATOMIC_ACQUIRE);
>       do {
>               got_new_data = false;
> +             if (needs_wakeup) {
> +                     /* Ensure either this sees a new record or its producer 
> sees
> +                      * the updated consumer position and sends a 
> notification.
> +                      */
> +                     __atomic_thread_fence(__ATOMIC_SEQ_CST);
> +                     needs_wakeup = false;
> +             }

Now on this, I think to address Sashiko's warning instead of the if
above...

>               prod_pos = __atomic_load_n(r->producer_pos, __ATOMIC_ACQUIRE);
>               while (cons_pos != prod_pos) {
>                       len_ptr = r->data + (cons_pos & r->mask);
>                       len = __atomic_load_n(len_ptr, __ATOMIC_ACQUIRE);
>  
> -                     /* sample not committed yet, bail out for now */
> +                     /* Retry a busy record once after publishing prior 
> records. */
>                       if (len & BPF_RINGBUF_BUSY_BIT)
> -                             goto done;
> +                             break;
>  
>                       got_new_data = true;
>                       cons_pos += roundup_len(len);
> @@ -294,6 +301,7 @@ static int64_t ringbuf_process_ring(struct ring *r, 
> size_t n)
>  
>                       __atomic_store_n(r->consumer_pos, cons_pos,
>                                        __ATOMIC_RELEASE);
> +                     needs_wakeup = true;
>  
>                       if (cnt >= n)
>                               goto done;

We just add an smp_mb() here. And since AFAICT smb_mb is available here,
we can also avoid the move from smp_* to __atomic in the previous patch.

> diff --git a/tools/testing/selftests/bpf/prog_tests/ringbuf.c 
> b/tools/testing/selftests/bpf/prog_tests/ringbuf.c
> index 9ce996bcea8c..5f0c679bf9a6 100644
> --- a/tools/testing/selftests/bpf/prog_tests/ringbuf.c
> +++ b/tools/testing/selftests/bpf/prog_tests/ringbuf.c
> @@ -492,6 +492,88 @@ static void ringbuf_null_cb_subtest(void)
>       test_ringbuf_n_lskel__destroy(skel_n);
>  }
>  
> +#define N_WAKEUP_SAMPLES 20000
> +
> +struct wakeup_ctx {
> +     bool stop;
> +};
> +
> +static void *wakeup_producer(void *arg)
> +{
> +     struct wakeup_ctx *ctx = arg;
> +
> +     while (!__atomic_load_n(&ctx->stop, __ATOMIC_RELAXED))

Why __ATOMIC_RELAXED here? Maybe just declare the variable volatile and
do regular reads?

> +             syscall(__NR_getpgid);
> +     return NULL;
> +}
> +
> +static void ringbuf_wakeup_subtest(void)
> +{
> +     struct test_ringbuf_n_lskel *skel_n;
> +     struct ring_buffer *ringbuf = NULL;
> +     struct epoll_event event = {
> +             .events = EPOLLIN | EPOLLET,
> +     };
> +     struct wakeup_ctx ctx = {};
> +     pthread_t producer;
> +     int epoll_fd = -1;
> +     int err, total = 0;
> +
> +     skel_n = test_ringbuf_n_lskel__open();
> +     if (!ASSERT_OK_PTR(skel_n, "test_ringbuf_n_lskel__open"))
> +             return;
> +
> +     skel_n->maps.ringbuf.max_entries = getpagesize();
> +     skel_n->bss->pid = getpid();
> +     skel_n->bss->value = SAMPLE_VALUE;
> +
> +     err = test_ringbuf_n_lskel__load(skel_n);
> +     if (!ASSERT_OK(err, "test_ringbuf_n_lskel__load"))
> +             goto cleanup;
> +
> +     err = test_ringbuf_n_lskel__attach(skel_n);
> +     if (!ASSERT_OK(err, "test_ringbuf_n_lskel__attach"))
> +             goto cleanup;
> +
> +     ringbuf = ring_buffer__new(skel_n->maps.ringbuf.map_fd,
> +                                process_noop_sample, NULL, NULL);
> +     if (!ASSERT_OK_PTR(ringbuf, "ring_buffer__new"))
> +             goto cleanup;
> +
> +     epoll_fd = epoll_create1(EPOLL_CLOEXEC);
> +     if (!ASSERT_OK_FD(epoll_fd, "epoll_create1"))
> +             goto cleanup_ringbuf;
> +
> +     err = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, skel_n->maps.ringbuf.map_fd,
> +                     &event);
> +     if (!ASSERT_OK(err, "epoll_ctl"))
> +             goto cleanup_epoll;
> +
> +     err = pthread_create(&producer, NULL, wakeup_producer, &ctx);
> +     if (!ASSERT_OK(err, "pthread_create"))
> +             goto cleanup_epoll;
> +
> +     while (total < N_WAKEUP_SAMPLES) {
> +             err = epoll_wait(epoll_fd, &event, 1, 1000);
> +             if (!ASSERT_EQ(err, 1, "epoll_wait"))
> +                     goto cleanup_thread;
> +             while ((err = ring_buffer__consume(ringbuf)) > 0)
> +                     total += err;
> +             if (!ASSERT_OK(err, "ring_buffer__consume"))
> +                     goto cleanup_thread;

Label is unnecessary, just call break;

> +     }
> +
> +cleanup_thread:
> +     __atomic_store_n(&ctx.stop, true, __ATOMIC_RELAXED);
> +     pthread_join(producer, NULL);
> +cleanup_epoll:
> +     close(epoll_fd);
> +cleanup_ringbuf:
> +     ring_buffer__free(ringbuf);
> +cleanup:
> +     test_ringbuf_n_lskel__destroy(skel_n);
> +}
> +
>  static void ringbuf_n_subtest(void)
>  {
>       struct test_ringbuf_n_lskel *skel_n;
> @@ -672,6 +754,8 @@ void test_ringbuf(void)
>               ringbuf_n_subtest();
>       if (test__start_subtest("ringbuf_null_cb"))
>               ringbuf_null_cb_subtest();
> +     if (test__start_subtest("ringbuf_wakeup"))
> +             ringbuf_wakeup_subtest();
>       if (test__start_subtest("ringbuf_map_key"))
>               ringbuf_map_key_subtest();
>       if (test__start_subtest("ringbuf_write"))


pw-bot: cr

Reply via email to