A rolling buffer is a series of folios held in a list of folio_queues.  New
folios and folio_queue structs may be inserted at the head simultaneously
with spent ones being removed from the tail without the need for locking.

The rolling buffer includes an iov_iter and it has to be careful managing
this as the list of folio_queues is extended such that an oops doesn't
incurred because the iterator was pointing to the end of a folio_queue
segment that got appended to and then removed.

We need to use the mechanism twice, once for read and once for write, and,
in future patches, we will use a second rolling buffer to handle bounce
buffering for content encryption.

Signed-off-by: David Howells <dhowe...@redhat.com>
cc: Jeff Layton <jlay...@kernel.org>
cc: ne...@lists.linux.dev
cc: linux-fsde...@vger.kernel.org
---
 fs/netfs/Makefile              |   1 +
 fs/netfs/buffered_read.c       | 119 ++++-------------
 fs/netfs/direct_read.c         |  14 +-
 fs/netfs/direct_write.c        |  10 +-
 fs/netfs/internal.h            |   4 -
 fs/netfs/misc.c                | 147 ---------------------
 fs/netfs/objects.c             |   2 +-
 fs/netfs/read_pgpriv2.c        |  32 ++---
 fs/netfs/read_retry.c          |   2 +-
 fs/netfs/rolling_buffer.c      | 226 +++++++++++++++++++++++++++++++++
 fs/netfs/write_collect.c       |  19 +--
 fs/netfs/write_issue.c         |  26 ++--
 include/linux/netfs.h          |  10 +-
 include/linux/rolling_buffer.h |  61 +++++++++
 include/trace/events/netfs.h   |   2 +
 15 files changed, 375 insertions(+), 300 deletions(-)
 create mode 100644 fs/netfs/rolling_buffer.c
 create mode 100644 include/linux/rolling_buffer.h

diff --git a/fs/netfs/Makefile b/fs/netfs/Makefile
index d08b0bfb6756..7492c4aa331e 100644
--- a/fs/netfs/Makefile
+++ b/fs/netfs/Makefile
@@ -13,6 +13,7 @@ netfs-y := \
        read_collect.o \
        read_pgpriv2.o \
        read_retry.o \
+       rolling_buffer.o \
        write_collect.o \
        write_issue.o
 
diff --git a/fs/netfs/buffered_read.c b/fs/netfs/buffered_read.c
index 7ec04d5628d8..db874fea8794 100644
--- a/fs/netfs/buffered_read.c
+++ b/fs/netfs/buffered_read.c
@@ -63,37 +63,6 @@ static int netfs_begin_cache_read(struct netfs_io_request 
*rreq, struct netfs_in
        return fscache_begin_read_operation(&rreq->cache_resources, 
netfs_i_cookie(ctx));
 }
 
-/*
- * Decant the list of folios to read into a rolling buffer.
- */
-static size_t netfs_load_buffer_from_ra(struct netfs_io_request *rreq,
-                                       struct folio_queue *folioq,
-                                       struct folio_batch *put_batch)
-{
-       unsigned int order, nr;
-       size_t size = 0;
-
-       nr = __readahead_batch(rreq->ractl, (struct page **)folioq->vec.folios,
-                              ARRAY_SIZE(folioq->vec.folios));
-       folioq->vec.nr = nr;
-       for (int i = 0; i < nr; i++) {
-               struct folio *folio = folioq_folio(folioq, i);
-
-               trace_netfs_folio(folio, netfs_folio_trace_read);
-               order = folio_order(folio);
-               folioq->orders[i] = order;
-               size += PAGE_SIZE << order;
-
-               if (!folio_batch_add(put_batch, folio))
-                       folio_batch_release(put_batch);
-       }
-
-       for (int i = nr; i < folioq_nr_slots(folioq); i++)
-               folioq_clear(folioq, i);
-
-       return size;
-}
-
 /*
  * netfs_prepare_read_iterator - Prepare the subreq iterator for I/O
  * @subreq: The subrequest to be set up
@@ -128,18 +97,12 @@ static ssize_t netfs_prepare_read_iterator(struct 
netfs_io_subrequest *subreq)
 
                folio_batch_init(&put_batch);
                while (rreq->submitted < subreq->start + rsize) {
-                       struct folio_queue *tail = rreq->buffer_tail, *new;
-                       size_t added;
-
-                       new = netfs_folioq_alloc(rreq->debug_id, GFP_NOFS,
-                                                
netfs_trace_folioq_alloc_read_prep);
-                       if (!new)
-                               return -ENOMEM;
-                       new->prev = tail;
-                       tail->next = new;
-                       rreq->buffer_tail = new;
-                       added = netfs_load_buffer_from_ra(rreq, new, 
&put_batch);
-                       rreq->iter.count += added;
+                       ssize_t added;
+
+                       added = rolling_buffer_load_from_ra(&rreq->buffer, 
rreq->ractl,
+                                                           &put_batch);
+                       if (added < 0)
+                               return added;
                        rreq->submitted += added;
                }
                folio_batch_release(&put_batch);
@@ -147,7 +110,7 @@ static ssize_t netfs_prepare_read_iterator(struct 
netfs_io_subrequest *subreq)
 
        subreq->len = rsize;
        if (unlikely(rreq->io_streams[0].sreq_max_segs)) {
-               size_t limit = netfs_limit_iter(&rreq->iter, 0, rsize,
+               size_t limit = netfs_limit_iter(&rreq->buffer.iter, 0, rsize,
                                                
rreq->io_streams[0].sreq_max_segs);
 
                if (limit < rsize) {
@@ -156,20 +119,16 @@ static ssize_t netfs_prepare_read_iterator(struct 
netfs_io_subrequest *subreq)
                }
        }
 
-       subreq->io_iter = rreq->iter;
+       subreq->io_iter = rreq->buffer.iter;
 
        if (iov_iter_is_folioq(&subreq->io_iter)) {
-               if (subreq->io_iter.folioq_slot >= 
folioq_nr_slots(subreq->io_iter.folioq)) {
-                       subreq->io_iter.folioq = subreq->io_iter.folioq->next;
-                       subreq->io_iter.folioq_slot = 0;
-               }
                subreq->curr_folioq = (struct folio_queue 
*)subreq->io_iter.folioq;
                subreq->curr_folioq_slot = subreq->io_iter.folioq_slot;
                subreq->curr_folio_order = 
subreq->curr_folioq->orders[subreq->curr_folioq_slot];
        }
 
        iov_iter_truncate(&subreq->io_iter, subreq->len);
-       iov_iter_advance(&rreq->iter, subreq->len);
+       rolling_buffer_advance(&rreq->buffer, subreq->len);
        return subreq->len;
 }
 
@@ -352,34 +311,6 @@ static int netfs_wait_for_read(struct netfs_io_request 
*rreq)
        return ret;
 }
 
-/*
- * Set up the initial folioq of buffer folios in the rolling buffer and set the
- * iterator to refer to it.
- */
-static int netfs_prime_buffer(struct netfs_io_request *rreq)
-{
-       struct folio_queue *folioq;
-       struct folio_batch put_batch;
-       size_t added;
-
-       folioq = netfs_folioq_alloc(rreq->debug_id, GFP_KERNEL,
-                                   netfs_trace_folioq_alloc_read_prime);
-       if (!folioq)
-               return -ENOMEM;
-
-       rreq->buffer = folioq;
-       rreq->buffer_tail = folioq;
-       rreq->submitted = rreq->start;
-       iov_iter_folio_queue(&rreq->iter, ITER_DEST, folioq, 0, 0, 0);
-
-       folio_batch_init(&put_batch);
-       added = netfs_load_buffer_from_ra(rreq, folioq, &put_batch);
-       folio_batch_release(&put_batch);
-       rreq->iter.count += added;
-       rreq->submitted += added;
-       return 0;
-}
-
 /**
  * netfs_readahead - Helper to manage a read request
  * @ractl: The description of the readahead request
@@ -419,7 +350,8 @@ void netfs_readahead(struct readahead_control *ractl)
        netfs_rreq_expand(rreq, ractl);
 
        rreq->ractl = ractl;
-       if (netfs_prime_buffer(rreq) < 0)
+       rreq->submitted = rreq->start;
+       if (rolling_buffer_init(&rreq->buffer, rreq->debug_id, ITER_DEST) < 0)
                goto cleanup_free;
        netfs_read_to_pagecache(rreq);
 
@@ -435,22 +367,18 @@ EXPORT_SYMBOL(netfs_readahead);
 /*
  * Create a rolling buffer with a single occupying folio.
  */
-static int netfs_create_singular_buffer(struct netfs_io_request *rreq, struct 
folio *folio)
+static int netfs_create_singular_buffer(struct netfs_io_request *rreq, struct 
folio *folio,
+                                       unsigned int rollbuf_flags)
 {
-       struct folio_queue *folioq;
+       ssize_t added;
 
-       folioq = netfs_folioq_alloc(rreq->debug_id, GFP_KERNEL,
-                                   netfs_trace_folioq_alloc_read_sing);
-       if (!folioq)
+       if (rolling_buffer_init(&rreq->buffer, rreq->debug_id, ITER_DEST) < 0)
                return -ENOMEM;
 
-       folioq_append(folioq, folio);
-       BUG_ON(folioq_folio(folioq, 0) != folio);
-       BUG_ON(folioq_folio_order(folioq, 0) != folio_order(folio));
-       rreq->buffer = folioq;
-       rreq->buffer_tail = folioq;
-       rreq->submitted = rreq->start + rreq->len;
-       iov_iter_folio_queue(&rreq->iter, ITER_DEST, folioq, 0, 0, rreq->len);
+       added = rolling_buffer_append(&rreq->buffer, folio, rollbuf_flags);
+       if (added < 0)
+               return added;
+       rreq->submitted = rreq->start + added;
        rreq->ractl = (struct readahead_control *)1UL;
        return 0;
 }
@@ -518,7 +446,7 @@ static int netfs_read_gaps(struct file *file, struct folio 
*folio)
        }
        if (to < flen)
                bvec_set_folio(&bvec[i++], folio, flen - to, to);
-       iov_iter_bvec(&rreq->iter, ITER_DEST, bvec, i, rreq->len);
+       iov_iter_bvec(&rreq->buffer.iter, ITER_DEST, bvec, i, rreq->len);
        rreq->submitted = rreq->start + flen;
 
        netfs_read_to_pagecache(rreq);
@@ -586,7 +514,7 @@ int netfs_read_folio(struct file *file, struct folio *folio)
        trace_netfs_read(rreq, rreq->start, rreq->len, 
netfs_read_trace_readpage);
 
        /* Set up the output buffer */
-       ret = netfs_create_singular_buffer(rreq, folio);
+       ret = netfs_create_singular_buffer(rreq, folio, 0);
        if (ret < 0)
                goto discard;
 
@@ -743,7 +671,7 @@ int netfs_write_begin(struct netfs_inode *ctx,
        trace_netfs_read(rreq, pos, len, netfs_read_trace_write_begin);
 
        /* Set up the output buffer */
-       ret = netfs_create_singular_buffer(rreq, folio);
+       ret = netfs_create_singular_buffer(rreq, folio, 0);
        if (ret < 0)
                goto error_put;
 
@@ -808,11 +736,10 @@ int netfs_prefetch_for_write(struct file *file, struct 
folio *folio,
        trace_netfs_read(rreq, start, flen, 
netfs_read_trace_prefetch_for_write);
 
        /* Set up the output buffer */
-       ret = netfs_create_singular_buffer(rreq, folio);
+       ret = netfs_create_singular_buffer(rreq, folio, 
NETFS_ROLLBUF_PAGECACHE_MARK);
        if (ret < 0)
                goto error_put;
 
-       folioq_mark2(rreq->buffer, 0);
        netfs_read_to_pagecache(rreq);
        ret = netfs_wait_for_read(rreq);
        netfs_put_request(rreq, false, netfs_rreq_trace_put_return);
diff --git a/fs/netfs/direct_read.c b/fs/netfs/direct_read.c
index b1a66a6e6bc2..a3f23adbae0f 100644
--- a/fs/netfs/direct_read.c
+++ b/fs/netfs/direct_read.c
@@ -25,7 +25,7 @@ static void netfs_prepare_dio_read_iterator(struct 
netfs_io_subrequest *subreq)
        subreq->len = rsize;
 
        if (unlikely(rreq->io_streams[0].sreq_max_segs)) {
-               size_t limit = netfs_limit_iter(&rreq->iter, 0, rsize,
+               size_t limit = netfs_limit_iter(&rreq->buffer.iter, 0, rsize,
                                                
rreq->io_streams[0].sreq_max_segs);
 
                if (limit < rsize) {
@@ -36,9 +36,9 @@ static void netfs_prepare_dio_read_iterator(struct 
netfs_io_subrequest *subreq)
 
        trace_netfs_sreq(subreq, netfs_sreq_trace_prepare);
 
-       subreq->io_iter = rreq->iter;
+       subreq->io_iter = rreq->buffer.iter;
        iov_iter_truncate(&subreq->io_iter, subreq->len);
-       iov_iter_advance(&rreq->iter, subreq->len);
+       iov_iter_advance(&rreq->buffer.iter, subreq->len);
 }
 
 /*
@@ -199,15 +199,15 @@ ssize_t netfs_unbuffered_read_iter_locked(struct kiocb 
*iocb, struct iov_iter *i
         * the request.
         */
        if (user_backed_iter(iter)) {
-               ret = netfs_extract_user_iter(iter, rreq->len, &rreq->iter, 0);
+               ret = netfs_extract_user_iter(iter, rreq->len, 
&rreq->buffer.iter, 0);
                if (ret < 0)
                        goto out;
-               rreq->direct_bv = (struct bio_vec *)rreq->iter.bvec;
+               rreq->direct_bv = (struct bio_vec *)rreq->buffer.iter.bvec;
                rreq->direct_bv_count = ret;
                rreq->direct_bv_unpin = iov_iter_extract_will_pin(iter);
-               rreq->len = iov_iter_count(&rreq->iter);
+               rreq->len = iov_iter_count(&rreq->buffer.iter);
        } else {
-               rreq->iter = *iter;
+               rreq->buffer.iter = *iter;
                rreq->len = orig_count;
                rreq->direct_bv_unpin = false;
                iov_iter_advance(iter, orig_count);
diff --git a/fs/netfs/direct_write.c b/fs/netfs/direct_write.c
index 173e8b5e6a93..eded8afaa60b 100644
--- a/fs/netfs/direct_write.c
+++ b/fs/netfs/direct_write.c
@@ -68,19 +68,17 @@ ssize_t netfs_unbuffered_write_iter_locked(struct kiocb 
*iocb, struct iov_iter *
                 * request.
                 */
                if (async || user_backed_iter(iter)) {
-                       n = netfs_extract_user_iter(iter, len, &wreq->iter, 0);
+                       n = netfs_extract_user_iter(iter, len, 
&wreq->buffer.iter, 0);
                        if (n < 0) {
                                ret = n;
                                goto out;
                        }
-                       wreq->direct_bv = (struct bio_vec *)wreq->iter.bvec;
+                       wreq->direct_bv = (struct bio_vec 
*)wreq->buffer.iter.bvec;
                        wreq->direct_bv_count = n;
                        wreq->direct_bv_unpin = iov_iter_extract_will_pin(iter);
                } else {
-                       wreq->iter = *iter;
+                       wreq->buffer.iter = *iter;
                }
-
-               wreq->io_iter = wreq->iter;
        }
 
        __set_bit(NETFS_RREQ_USE_IO_ITER, &wreq->flags);
@@ -92,7 +90,7 @@ ssize_t netfs_unbuffered_write_iter_locked(struct kiocb 
*iocb, struct iov_iter *
        __set_bit(NETFS_RREQ_UPLOAD_TO_SERVER, &wreq->flags);
        if (async)
                wreq->iocb = iocb;
-       wreq->len = iov_iter_count(&wreq->io_iter);
+       wreq->len = iov_iter_count(&wreq->buffer.iter);
        wreq->cleanup = netfs_cleanup_dio_write;
        ret = netfs_unbuffered_write(wreq, is_sync_kiocb(iocb), wreq->len);
        if (ret < 0) {
diff --git a/fs/netfs/internal.h b/fs/netfs/internal.h
index 01b013f558f7..ccd9058acb61 100644
--- a/fs/netfs/internal.h
+++ b/fs/netfs/internal.h
@@ -60,10 +60,6 @@ static inline void netfs_proc_del_rreq(struct 
netfs_io_request *rreq) {}
  */
 struct folio_queue *netfs_buffer_make_space(struct netfs_io_request *rreq,
                                            enum netfs_folioq_trace trace);
-int netfs_buffer_append_folio(struct netfs_io_request *rreq, struct folio 
*folio,
-                             bool needs_put);
-struct folio_queue *netfs_delete_buffer_head(struct netfs_io_request *wreq);
-void netfs_clear_buffer(struct netfs_io_request *rreq);
 void netfs_reset_iter(struct netfs_io_subrequest *subreq);
 
 /*
diff --git a/fs/netfs/misc.c b/fs/netfs/misc.c
index afe032551de5..4249715f4171 100644
--- a/fs/netfs/misc.c
+++ b/fs/netfs/misc.c
@@ -8,153 +8,6 @@
 #include <linux/swap.h>
 #include "internal.h"
 
-/**
- * netfs_folioq_alloc - Allocate a folio_queue struct
- * @rreq_id: Associated debugging ID for tracing purposes
- * @gfp: Allocation constraints
- * @trace: Trace tag to indicate the purpose of the allocation
- *
- * Allocate, initialise and account the folio_queue struct and log a trace line
- * to mark the allocation.
- */
-struct folio_queue *netfs_folioq_alloc(unsigned int rreq_id, gfp_t gfp,
-                                      unsigned int /*enum netfs_folioq_trace*/ 
trace)
-{
-       static atomic_t debug_ids;
-       struct folio_queue *fq;
-
-       fq = kmalloc(sizeof(*fq), gfp);
-       if (fq) {
-               netfs_stat(&netfs_n_folioq);
-               folioq_init(fq, rreq_id);
-               fq->debug_id = atomic_inc_return(&debug_ids);
-               trace_netfs_folioq(fq, trace);
-       }
-       return fq;
-}
-EXPORT_SYMBOL(netfs_folioq_alloc);
-
-/**
- * netfs_folioq_free - Free a folio_queue struct
- * @folioq: The object to free
- * @trace: Trace tag to indicate which free
- *
- * Free and unaccount the folio_queue struct.
- */
-void netfs_folioq_free(struct folio_queue *folioq,
-                      unsigned int /*enum netfs_trace_folioq*/ trace)
-{
-       trace_netfs_folioq(folioq, trace);
-       netfs_stat_d(&netfs_n_folioq);
-       kfree(folioq);
-}
-EXPORT_SYMBOL(netfs_folioq_free);
-
-/*
- * Make sure there's space in the rolling queue.
- */
-struct folio_queue *netfs_buffer_make_space(struct netfs_io_request *rreq,
-                                           enum netfs_folioq_trace trace)
-{
-       struct folio_queue *tail = rreq->buffer_tail, *prev;
-       unsigned int prev_nr_slots = 0;
-
-       if (WARN_ON_ONCE(!rreq->buffer && tail) ||
-           WARN_ON_ONCE(rreq->buffer && !tail))
-               return ERR_PTR(-EIO);
-
-       prev = tail;
-       if (prev) {
-               if (!folioq_full(tail))
-                       return tail;
-               prev_nr_slots = folioq_nr_slots(tail);
-       }
-
-       tail = netfs_folioq_alloc(rreq->debug_id, GFP_NOFS, trace);
-       if (!tail)
-               return ERR_PTR(-ENOMEM);
-       tail->prev = prev;
-       if (prev)
-               /* [!] NOTE: After we set prev->next, the consumer is entirely
-                * at liberty to delete prev.
-                */
-               WRITE_ONCE(prev->next, tail);
-
-       rreq->buffer_tail = tail;
-       if (!rreq->buffer) {
-               rreq->buffer = tail;
-               iov_iter_folio_queue(&rreq->io_iter, ITER_SOURCE, tail, 0, 0, 
0);
-       } else {
-               /* Make sure we don't leave the master iterator pointing to a
-                * block that might get immediately consumed.
-                */
-               if (rreq->io_iter.folioq == prev &&
-                   rreq->io_iter.folioq_slot == prev_nr_slots) {
-                       rreq->io_iter.folioq = tail;
-                       rreq->io_iter.folioq_slot = 0;
-               }
-       }
-       rreq->buffer_tail_slot = 0;
-       return tail;
-}
-
-/*
- * Append a folio to the rolling queue.
- */
-int netfs_buffer_append_folio(struct netfs_io_request *rreq, struct folio 
*folio,
-                             bool needs_put)
-{
-       struct folio_queue *tail;
-       unsigned int slot, order = folio_order(folio);
-
-       tail = netfs_buffer_make_space(rreq, 
netfs_trace_folioq_alloc_append_folio);
-       if (IS_ERR(tail))
-               return PTR_ERR(tail);
-
-       rreq->io_iter.count += PAGE_SIZE << order;
-
-       slot = folioq_append(tail, folio);
-       /* Store the counter after setting the slot. */
-       smp_store_release(&rreq->buffer_tail_slot, slot);
-       return 0;
-}
-
-/*
- * Delete the head of a rolling queue.
- */
-struct folio_queue *netfs_delete_buffer_head(struct netfs_io_request *wreq)
-{
-       struct folio_queue *head = wreq->buffer, *next = head->next;
-
-       if (next)
-               next->prev = NULL;
-       netfs_folioq_free(head, netfs_trace_folioq_delete);
-       wreq->buffer = next;
-       return next;
-}
-
-/*
- * Clear out a rolling queue.
- */
-void netfs_clear_buffer(struct netfs_io_request *rreq)
-{
-       struct folio_queue *p;
-
-       while ((p = rreq->buffer)) {
-               rreq->buffer = p->next;
-               for (int slot = 0; slot < folioq_count(p); slot++) {
-                       struct folio *folio = folioq_folio(p, slot);
-                       if (!folio)
-                               continue;
-                       if (folioq_is_marked(p, slot)) {
-                               trace_netfs_folio(folio, netfs_folio_trace_put);
-                               folio_put(folio);
-                       }
-               }
-               netfs_folioq_free(p, netfs_trace_folioq_clear);
-       }
-}
-
 /*
  * Reset the subrequest iterator to refer just to the region remaining to be
  * read.  The iterator may or may not have been advanced by socket ops or
diff --git a/fs/netfs/objects.c b/fs/netfs/objects.c
index 31e388ec6e48..5cdddaf1f978 100644
--- a/fs/netfs/objects.c
+++ b/fs/netfs/objects.c
@@ -143,7 +143,7 @@ static void netfs_free_request(struct work_struct *work)
                }
                kvfree(rreq->direct_bv);
        }
-       netfs_clear_buffer(rreq);
+       rolling_buffer_clear(&rreq->buffer);
 
        if (atomic_dec_and_test(&ictx->io_count))
                wake_up_var(&ictx->io_count);
diff --git a/fs/netfs/read_pgpriv2.c b/fs/netfs/read_pgpriv2.c
index 54d5004fec18..9eee5af6b327 100644
--- a/fs/netfs/read_pgpriv2.c
+++ b/fs/netfs/read_pgpriv2.c
@@ -34,8 +34,9 @@ void netfs_pgpriv2_mark_copy_to_cache(struct 
netfs_io_subrequest *subreq,
  * [DEPRECATED] Cancel PG_private_2 on all marked folios in the event of an
  * unrecoverable error.
  */
-static void netfs_pgpriv2_cancel(struct folio_queue *folioq)
+static void netfs_pgpriv2_cancel(struct rolling_buffer *buffer)
 {
+       struct folio_queue *folioq = buffer->tail;
        struct folio *folio;
        int slot;
 
@@ -94,7 +95,7 @@ static int netfs_pgpriv2_copy_folio(struct netfs_io_request 
*wreq, struct folio
        trace_netfs_folio(folio, netfs_folio_trace_store_copy);
 
        /* Attach the folio to the rolling buffer. */
-       if (netfs_buffer_append_folio(wreq, folio, false) < 0)
+       if (rolling_buffer_append(&wreq->buffer, folio, 0) < 0)
                return -ENOMEM;
 
        cache->submit_extendable_to = fsize;
@@ -109,7 +110,7 @@ static int netfs_pgpriv2_copy_folio(struct netfs_io_request 
*wreq, struct folio
        do {
                ssize_t part;
 
-               wreq->io_iter.iov_offset = cache->submit_off;
+               wreq->buffer.iter.iov_offset = cache->submit_off;
 
                atomic64_set(&wreq->issued_to, fpos + cache->submit_off);
                cache->submit_extendable_to = fsize - cache->submit_off;
@@ -122,8 +123,8 @@ static int netfs_pgpriv2_copy_folio(struct netfs_io_request 
*wreq, struct folio
                        cache->submit_len -= part;
        } while (cache->submit_len > 0);
 
-       wreq->io_iter.iov_offset = 0;
-       iov_iter_advance(&wreq->io_iter, fsize);
+       wreq->buffer.iter.iov_offset = 0;
+       rolling_buffer_advance(&wreq->buffer, fsize);
        atomic64_set(&wreq->issued_to, fpos + fsize);
 
        if (flen < fsize)
@@ -151,7 +152,7 @@ void netfs_pgpriv2_write_to_the_cache(struct 
netfs_io_request *rreq)
                goto couldnt_start;
 
        /* Need the first folio to be able to set up the op. */
-       for (folioq = rreq->buffer; folioq; folioq = folioq->next) {
+       for (folioq = rreq->buffer.tail; folioq; folioq = folioq->next) {
                if (folioq->marks3) {
                        slot = __ffs(folioq->marks3);
                        break;
@@ -198,7 +199,7 @@ void netfs_pgpriv2_write_to_the_cache(struct 
netfs_io_request *rreq)
        netfs_put_request(wreq, false, netfs_rreq_trace_put_return);
        _leave(" = %d", error);
 couldnt_start:
-       netfs_pgpriv2_cancel(rreq->buffer);
+       netfs_pgpriv2_cancel(&rreq->buffer);
 }
 
 /*
@@ -207,13 +208,13 @@ void netfs_pgpriv2_write_to_the_cache(struct 
netfs_io_request *rreq)
  */
 bool netfs_pgpriv2_unlock_copied_folios(struct netfs_io_request *wreq)
 {
-       struct folio_queue *folioq = wreq->buffer;
+       struct folio_queue *folioq = wreq->buffer.tail;
        unsigned long long collected_to = wreq->collected_to;
-       unsigned int slot = wreq->buffer_head_slot;
+       unsigned int slot = wreq->buffer.first_tail_slot;
        bool made_progress = false;
 
        if (slot >= folioq_nr_slots(folioq)) {
-               folioq = netfs_delete_buffer_head(wreq);
+               folioq = rolling_buffer_delete_spent(&wreq->buffer);
                slot = 0;
        }
 
@@ -252,9 +253,9 @@ bool netfs_pgpriv2_unlock_copied_folios(struct 
netfs_io_request *wreq)
                folioq_clear(folioq, slot);
                slot++;
                if (slot >= folioq_nr_slots(folioq)) {
-                       if (READ_ONCE(wreq->buffer_tail) == folioq)
-                               break;
-                       folioq = netfs_delete_buffer_head(wreq);
+                       folioq = rolling_buffer_delete_spent(&wreq->buffer);
+                       if (!folioq)
+                               goto done;
                        slot = 0;
                }
 
@@ -262,7 +263,8 @@ bool netfs_pgpriv2_unlock_copied_folios(struct 
netfs_io_request *wreq)
                        break;
        }
 
-       wreq->buffer = folioq;
-       wreq->buffer_head_slot = slot;
+       wreq->buffer.tail = folioq;
+done:
+       wreq->buffer.first_tail_slot = slot;
        return made_progress;
 }
diff --git a/fs/netfs/read_retry.c b/fs/netfs/read_retry.c
index 21b4a54e545e..0983234c2183 100644
--- a/fs/netfs/read_retry.c
+++ b/fs/netfs/read_retry.c
@@ -245,7 +245,7 @@ void netfs_unlock_abandoned_read_pages(struct 
netfs_io_request *rreq)
 {
        struct folio_queue *p;
 
-       for (p = rreq->buffer; p; p = p->next) {
+       for (p = rreq->buffer.tail; p; p = p->next) {
                for (int slot = 0; slot < folioq_count(p); slot++) {
                        struct folio *folio = folioq_folio(p, slot);
 
diff --git a/fs/netfs/rolling_buffer.c b/fs/netfs/rolling_buffer.c
new file mode 100644
index 000000000000..75d97af14b4a
--- /dev/null
+++ b/fs/netfs/rolling_buffer.c
@@ -0,0 +1,226 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/* Rolling buffer helpers
+ *
+ * Copyright (C) 2024 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowe...@redhat.com)
+ */
+
+#include <linux/bitops.h>
+#include <linux/pagemap.h>
+#include <linux/rolling_buffer.h>
+#include <linux/slab.h>
+#include "internal.h"
+
+static atomic_t debug_ids;
+
+/**
+ * netfs_folioq_alloc - Allocate a folio_queue struct
+ * @rreq_id: Associated debugging ID for tracing purposes
+ * @gfp: Allocation constraints
+ * @trace: Trace tag to indicate the purpose of the allocation
+ *
+ * Allocate, initialise and account the folio_queue struct and log a trace line
+ * to mark the allocation.
+ */
+struct folio_queue *netfs_folioq_alloc(unsigned int rreq_id, gfp_t gfp,
+                                      unsigned int /*enum netfs_folioq_trace*/ 
trace)
+{
+       struct folio_queue *fq;
+
+       fq = kmalloc(sizeof(*fq), gfp);
+       if (fq) {
+               netfs_stat(&netfs_n_folioq);
+               folioq_init(fq, rreq_id);
+               fq->debug_id = atomic_inc_return(&debug_ids);
+               trace_netfs_folioq(fq, trace);
+       }
+       return fq;
+}
+EXPORT_SYMBOL(netfs_folioq_alloc);
+
+/**
+ * netfs_folioq_free - Free a folio_queue struct
+ * @folioq: The object to free
+ * @trace: Trace tag to indicate which free
+ *
+ * Free and unaccount the folio_queue struct.
+ */
+void netfs_folioq_free(struct folio_queue *folioq,
+                      unsigned int /*enum netfs_trace_folioq*/ trace)
+{
+       trace_netfs_folioq(folioq, trace);
+       netfs_stat_d(&netfs_n_folioq);
+       kfree(folioq);
+}
+EXPORT_SYMBOL(netfs_folioq_free);
+
+/*
+ * Initialise a rolling buffer.  We allocate an empty folio queue struct to so
+ * that the pointers can be independently driven by the producer and the
+ * consumer.
+ */
+int rolling_buffer_init(struct rolling_buffer *roll, unsigned int rreq_id,
+                       unsigned int direction)
+{
+       struct folio_queue *fq;
+
+       fq = netfs_folioq_alloc(rreq_id, GFP_NOFS, 
netfs_trace_folioq_rollbuf_init);
+       if (!fq)
+               return -ENOMEM;
+
+       roll->head = fq;
+       roll->tail = fq;
+       iov_iter_folio_queue(&roll->iter, direction, fq, 0, 0, 0);
+       return 0;
+}
+
+/*
+ * Add another folio_queue to a rolling buffer if there's no space left.
+ */
+int rolling_buffer_make_space(struct rolling_buffer *roll)
+{
+       struct folio_queue *fq, *head = roll->head;
+
+       if (!folioq_full(head))
+               return 0;
+
+       fq = netfs_folioq_alloc(head->rreq_id, GFP_NOFS, 
netfs_trace_folioq_make_space);
+       if (!fq)
+               return -ENOMEM;
+       fq->prev = head;
+
+       roll->head = fq;
+       if (folioq_full(head)) {
+               /* Make sure we don't leave the master iterator pointing to a
+                * block that might get immediately consumed.
+                */
+               if (roll->iter.folioq == head &&
+                   roll->iter.folioq_slot == folioq_nr_slots(head)) {
+                       roll->iter.folioq = fq;
+                       roll->iter.folioq_slot = 0;
+               }
+       }
+
+       /* Make sure the initialisation is stored before the next pointer.
+        *
+        * [!] NOTE: After we set head->next, the consumer is at liberty to
+        * immediately delete the old head.
+        */
+       smp_store_release(&head->next, fq);
+       return 0;
+}
+
+/*
+ * Decant the list of folios to read into a rolling buffer.
+ */
+ssize_t rolling_buffer_load_from_ra(struct rolling_buffer *roll,
+                                   struct readahead_control *ractl,
+                                   struct folio_batch *put_batch)
+{
+       struct folio_queue *fq;
+       struct page **vec;
+       int nr, ix, to;
+       ssize_t size = 0;
+
+       if (rolling_buffer_make_space(roll) < 0)
+               return -ENOMEM;
+
+       fq = roll->head;
+       vec = (struct page **)fq->vec.folios;
+       nr = __readahead_batch(ractl, vec + folio_batch_count(&fq->vec),
+                              folio_batch_space(&fq->vec));
+       ix = fq->vec.nr;
+       to = ix + nr;
+       fq->vec.nr = to;
+       for (; ix < to; ix++) {
+               struct folio *folio = folioq_folio(fq, ix);
+               unsigned int order = folio_order(folio);
+
+               fq->orders[ix] = order;
+               size += PAGE_SIZE << order;
+               trace_netfs_folio(folio, netfs_folio_trace_read);
+               if (!folio_batch_add(put_batch, folio))
+                       folio_batch_release(put_batch);
+       }
+       WRITE_ONCE(roll->iter.count, roll->iter.count + size);
+
+       /* Store the counter after setting the slot. */
+       smp_store_release(&roll->next_head_slot, to);
+
+       for (; ix < folioq_nr_slots(fq); ix++)
+               folioq_clear(fq, ix);
+
+       return size;
+}
+
+/*
+ * Append a folio to the rolling buffer.
+ */
+ssize_t rolling_buffer_append(struct rolling_buffer *roll, struct folio *folio,
+                             unsigned int flags)
+{
+       ssize_t size = folio_size(folio);
+       int slot;
+
+       if (rolling_buffer_make_space(roll) < 0)
+               return -ENOMEM;
+
+       slot = folioq_append(roll->head, folio);
+       if (flags & ROLLBUF_MARK_1)
+               folioq_mark(roll->head, slot);
+       if (flags & ROLLBUF_MARK_2)
+               folioq_mark2(roll->head, slot);
+
+       WRITE_ONCE(roll->iter.count, roll->iter.count + size);
+
+       /* Store the counter after setting the slot. */
+       smp_store_release(&roll->next_head_slot, slot);
+       return size;
+}
+
+/*
+ * Delete a spent buffer from a rolling queue and return the next in line.  We
+ * don't return the last buffer to keep the pointers independent, but return
+ * NULL instead.
+ */
+struct folio_queue *rolling_buffer_delete_spent(struct rolling_buffer *roll)
+{
+       struct folio_queue *spent = roll->tail, *next = READ_ONCE(spent->next);
+
+       if (!next)
+               return NULL;
+       next->prev = NULL;
+       netfs_folioq_free(spent, netfs_trace_folioq_delete);
+       roll->tail = next;
+       return next;
+}
+
+/*
+ * Clear out a rolling queue.  Folios that have mark 1 set are put.
+ */
+void rolling_buffer_clear(struct rolling_buffer *roll)
+{
+       struct folio_batch fbatch;
+       struct folio_queue *p;
+
+       folio_batch_init(&fbatch);
+
+       while ((p = roll->tail)) {
+               roll->tail = p->next;
+               for (int slot = 0; slot < folioq_count(p); slot++) {
+                       struct folio *folio = folioq_folio(p, slot);
+
+                       if (!folio)
+                               continue;
+                       if (folioq_is_marked(p, slot)) {
+                               trace_netfs_folio(folio, netfs_folio_trace_put);
+                               if (!folio_batch_add(&fbatch, folio))
+                                       folio_batch_release(&fbatch);
+                       }
+               }
+
+               netfs_folioq_free(p, netfs_trace_folioq_clear);
+       }
+
+       folio_batch_release(&fbatch);
+}
diff --git a/fs/netfs/write_collect.c b/fs/netfs/write_collect.c
index ca3a11ed9b54..364c1f9d5815 100644
--- a/fs/netfs/write_collect.c
+++ b/fs/netfs/write_collect.c
@@ -83,9 +83,9 @@ int netfs_folio_written_back(struct folio *folio)
 static void netfs_writeback_unlock_folios(struct netfs_io_request *wreq,
                                          unsigned int *notes)
 {
-       struct folio_queue *folioq = wreq->buffer;
+       struct folio_queue *folioq = wreq->buffer.tail;
        unsigned long long collected_to = wreq->collected_to;
-       unsigned int slot = wreq->buffer_head_slot;
+       unsigned int slot = wreq->buffer.first_tail_slot;
 
        if (wreq->origin == NETFS_PGPRIV2_COPY_TO_CACHE) {
                if (netfs_pgpriv2_unlock_copied_folios(wreq))
@@ -94,7 +94,9 @@ static void netfs_writeback_unlock_folios(struct 
netfs_io_request *wreq,
        }
 
        if (slot >= folioq_nr_slots(folioq)) {
-               folioq = netfs_delete_buffer_head(wreq);
+               folioq = rolling_buffer_delete_spent(&wreq->buffer);
+               if (!folioq)
+                       return;
                slot = 0;
        }
 
@@ -134,9 +136,9 @@ static void netfs_writeback_unlock_folios(struct 
netfs_io_request *wreq,
                folioq_clear(folioq, slot);
                slot++;
                if (slot >= folioq_nr_slots(folioq)) {
-                       if (READ_ONCE(wreq->buffer_tail) == folioq)
-                               break;
-                       folioq = netfs_delete_buffer_head(wreq);
+                       folioq = rolling_buffer_delete_spent(&wreq->buffer);
+                       if (!folioq)
+                               goto done;
                        slot = 0;
                }
 
@@ -144,8 +146,9 @@ static void netfs_writeback_unlock_folios(struct 
netfs_io_request *wreq,
                        break;
        }
 
-       wreq->buffer = folioq;
-       wreq->buffer_head_slot = slot;
+       wreq->buffer.tail = folioq;
+done:
+       wreq->buffer.first_tail_slot = slot;
 }
 
 /*
diff --git a/fs/netfs/write_issue.c b/fs/netfs/write_issue.c
index 87e5cf4a0957..88ceba49ff69 100644
--- a/fs/netfs/write_issue.c
+++ b/fs/netfs/write_issue.c
@@ -107,6 +107,8 @@ struct netfs_io_request *netfs_create_write_req(struct 
address_space *mapping,
        ictx = netfs_inode(wreq->inode);
        if (is_buffered && netfs_is_cache_enabled(ictx))
                fscache_begin_write_operation(&wreq->cache_resources, 
netfs_i_cookie(ictx));
+       if (rolling_buffer_init(&wreq->buffer, wreq->debug_id, ITER_SOURCE) < 0)
+               goto nomem;
 
        wreq->cleaned_to = wreq->start;
 
@@ -129,6 +131,10 @@ struct netfs_io_request *netfs_create_write_req(struct 
address_space *mapping,
        }
 
        return wreq;
+nomem:
+       wreq->error = -ENOMEM;
+       netfs_put_request(wreq, false, netfs_rreq_trace_put_failed);
+       return ERR_PTR(-ENOMEM);
 }
 
 /**
@@ -153,16 +159,15 @@ static void netfs_prepare_write(struct netfs_io_request 
*wreq,
                                loff_t start)
 {
        struct netfs_io_subrequest *subreq;
-       struct iov_iter *wreq_iter = &wreq->io_iter;
+       struct iov_iter *wreq_iter = &wreq->buffer.iter;
 
        /* Make sure we don't point the iterator at a used-up folio_queue
         * struct being used as a placeholder to prevent the queue from
         * collapsing.  In such a case, extend the queue.
         */
        if (iov_iter_is_folioq(wreq_iter) &&
-           wreq_iter->folioq_slot >= folioq_nr_slots(wreq_iter->folioq)) {
-               netfs_buffer_make_space(wreq, netfs_trace_folioq_prep_write);
-       }
+           wreq_iter->folioq_slot >= folioq_nr_slots(wreq_iter->folioq))
+               rolling_buffer_make_space(&wreq->buffer);
 
        subreq = netfs_alloc_subrequest(wreq);
        subreq->source          = stream->source;
@@ -327,6 +332,9 @@ static int netfs_write_folio(struct netfs_io_request *wreq,
 
        _enter("");
 
+       if (rolling_buffer_make_space(&wreq->buffer) < 0)
+               return -ENOMEM;
+
        /* netfs_perform_write() may shift i_size around the page or from out
         * of the page to beyond it, but cannot move i_size into or through the
         * page since we have it locked.
@@ -431,7 +439,7 @@ static int netfs_write_folio(struct netfs_io_request *wreq,
        }
 
        /* Attach the folio to the rolling buffer. */
-       netfs_buffer_append_folio(wreq, folio, false);
+       rolling_buffer_append(&wreq->buffer, folio, 0);
 
        /* Move the submission point forward to allow for write-streaming data
         * not starting at the front of the page.  We don't do write-streaming
@@ -478,7 +486,7 @@ static int netfs_write_folio(struct netfs_io_request *wreq,
 
                /* Advance the iterator(s). */
                if (stream->submit_off > iter_off) {
-                       iov_iter_advance(&wreq->io_iter, stream->submit_off - 
iter_off);
+                       rolling_buffer_advance(&wreq->buffer, 
stream->submit_off - iter_off);
                        iter_off = stream->submit_off;
                }
 
@@ -496,7 +504,7 @@ static int netfs_write_folio(struct netfs_io_request *wreq,
        }
 
        if (fsize > iter_off)
-               iov_iter_advance(&wreq->io_iter, fsize - iter_off);
+               rolling_buffer_advance(&wreq->buffer, fsize - iter_off);
        atomic64_set(&wreq->issued_to, fpos + fsize);
 
        if (!debug)
@@ -635,7 +643,7 @@ int netfs_advance_writethrough(struct netfs_io_request 
*wreq, struct writeback_c
                               struct folio **writethrough_cache)
 {
        _enter("R=%x ic=%zu ws=%u cp=%zu tp=%u",
-              wreq->debug_id, wreq->iter.count, wreq->wsize, copied, 
to_page_end);
+              wreq->debug_id, wreq->buffer.iter.count, wreq->wsize, copied, 
to_page_end);
 
        if (!*writethrough_cache) {
                if (folio_test_dirty(folio))
@@ -710,7 +718,7 @@ int netfs_unbuffered_write(struct netfs_io_request *wreq, 
bool may_wait, size_t
                part = netfs_advance_write(wreq, upload, start, len, false);
                start += part;
                len -= part;
-               iov_iter_advance(&wreq->io_iter, part);
+               rolling_buffer_advance(&wreq->buffer, part);
                if (test_bit(NETFS_RREQ_PAUSE, &wreq->flags)) {
                        trace_netfs_rreq(wreq, netfs_rreq_trace_wait_pause);
                        wait_on_bit(&wreq->flags, NETFS_RREQ_PAUSE, 
TASK_UNINTERRUPTIBLE);
diff --git a/include/linux/netfs.h b/include/linux/netfs.h
index 5b2f427f8e3e..bd922f0936e3 100644
--- a/include/linux/netfs.h
+++ b/include/linux/netfs.h
@@ -18,6 +18,7 @@
 #include <linux/fs.h>
 #include <linux/pagemap.h>
 #include <linux/uio.h>
+#include <linux/rolling_buffer.h>
 
 enum netfs_sreq_ref_trace;
 typedef struct mempool_s mempool_t;
@@ -238,10 +239,9 @@ struct netfs_io_request {
        struct netfs_io_stream  io_streams[2];  /* Streams of parallel I/O 
operations */
 #define NR_IO_STREAMS 2 //wreq->nr_io_streams
        struct netfs_group      *group;         /* Writeback group being 
written back */
-       struct folio_queue      *buffer;        /* Head of I/O buffer */
-       struct folio_queue      *buffer_tail;   /* Tail of I/O buffer */
-       struct iov_iter         iter;           /* Unencrypted-side iterator */
-       struct iov_iter         io_iter;        /* I/O (Encrypted-side) 
iterator */
+       struct rolling_buffer   buffer;         /* Unencrypted buffer */
+#define NETFS_ROLLBUF_PUT_MARK         ROLLBUF_MARK_1
+#define NETFS_ROLLBUF_PAGECACHE_MARK   ROLLBUF_MARK_2
        void                    *netfs_priv;    /* Private data for the netfs */
        void                    *netfs_priv2;   /* Private data for the netfs */
        struct bio_vec          *direct_bv;     /* DIO buffer list (when 
handling iovec-iter) */
@@ -259,8 +259,6 @@ struct netfs_io_request {
        long                    error;          /* 0 or error that occurred */
        enum netfs_io_origin    origin;         /* Origin of the request */
        bool                    direct_bv_unpin; /* T if direct_bv[] must be 
unpinned */
-       u8                      buffer_head_slot; /* First slot in ->buffer */
-       u8                      buffer_tail_slot; /* Next slot in ->buffer_tail 
*/
        unsigned long long      i_size;         /* Size of the file */
        unsigned long long      start;          /* Start position */
        atomic64_t              issued_to;      /* Write issuer folio cursor */
diff --git a/include/linux/rolling_buffer.h b/include/linux/rolling_buffer.h
new file mode 100644
index 000000000000..ac15b1ffdd83
--- /dev/null
+++ b/include/linux/rolling_buffer.h
@@ -0,0 +1,61 @@
+/* SPDX-License-Identifier: GPL-2.0-or-later */
+/* Rolling buffer of folios
+ *
+ * Copyright (C) 2024 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowe...@redhat.com)
+ */
+
+#ifndef _ROLLING_BUFFER_H
+#define _ROLLING_BUFFER_H
+
+#include <linux/folio_queue.h>
+#include <linux/uio.h>
+
+/*
+ * Rolling buffer.  Whilst the buffer is live and in use, folios and folio
+ * queue segments can be added to one end by one thread and removed from the
+ * other end by another thread.  The buffer isn't allowed to be empty; it must
+ * always have at least one folio_queue in it so that neither side has to
+ * modify both queue pointers.
+ *
+ * The iterator in the buffer is extended as buffers are inserted.  It can be
+ * snapshotted to use a segment of the buffer.
+ */
+struct rolling_buffer {
+       struct folio_queue      *head;          /* Producer's insertion point */
+       struct folio_queue      *tail;          /* Consumer's removal point */
+       struct iov_iter         iter;           /* Iterator tracking what's 
left in the buffer */
+       u8                      next_head_slot; /* Next slot in ->head */
+       u8                      first_tail_slot; /* First slot in ->tail */
+};
+
+/*
+ * Snapshot of a rolling buffer.
+ */
+struct rolling_buffer_snapshot {
+       struct folio_queue      *curr_folioq;   /* Queue segment in which 
current folio resides */
+       unsigned char           curr_slot;      /* Folio currently being read */
+       unsigned char           curr_order;     /* Order of folio */
+};
+
+/* Marks to store per-folio in the internal folio_queue structs. */
+#define ROLLBUF_MARK_1 BIT(0)
+#define ROLLBUF_MARK_2 BIT(1)
+
+int rolling_buffer_init(struct rolling_buffer *roll, unsigned int rreq_id,
+                       unsigned int direction);
+int rolling_buffer_make_space(struct rolling_buffer *roll);
+ssize_t rolling_buffer_load_from_ra(struct rolling_buffer *roll,
+                                   struct readahead_control *ractl,
+                                   struct folio_batch *put_batch);
+ssize_t rolling_buffer_append(struct rolling_buffer *roll, struct folio *folio,
+                             unsigned int flags);
+struct folio_queue *rolling_buffer_delete_spent(struct rolling_buffer *roll);
+void rolling_buffer_clear(struct rolling_buffer *roll);
+
+static inline void rolling_buffer_advance(struct rolling_buffer *roll, size_t 
amount)
+{
+       iov_iter_advance(&roll->iter, amount);
+}
+
+#endif /* _ROLLING_BUFFER_H */
diff --git a/include/trace/events/netfs.h b/include/trace/events/netfs.h
index 50aa6745df95..2dfc9f716e3b 100644
--- a/include/trace/events/netfs.h
+++ b/include/trace/events/netfs.h
@@ -198,7 +198,9 @@
        EM(netfs_trace_folioq_alloc_read_sing,  "alloc-r-sing") \
        EM(netfs_trace_folioq_clear,            "clear")        \
        EM(netfs_trace_folioq_delete,           "delete")       \
+       EM(netfs_trace_folioq_make_space,       "make-space")   \
        EM(netfs_trace_folioq_prep_write,       "prep-wr")      \
+       EM(netfs_trace_folioq_rollbuf_init,     "roll-init")    \
        E_(netfs_trace_folioq_read_progress,    "r-progress")
 
 #ifndef __NETFS_DECLARE_TRACE_ENUMS_ONCE_ONLY

Reply via email to