If the file is flagged with FMODE_BUF_RASYNC, then we don't have to punt
the buffered read to an io-wq worker. Instead we can rely on page
unlocking callbacks to support retry based async IO. This is a lot more
efficient than doing async thread offload.

The retry is done similarly to how we handle poll based retry. From
the unlock callback, we simply queue the retry to a task_work based
handler.

Signed-off-by: Jens Axboe <ax...@kernel.dk>
---
 fs/io_uring.c | 102 ++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 102 insertions(+)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index e95481c552ff..9eeae10db648 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -498,6 +498,8 @@ struct io_async_rw {
        struct iovec                    *iov;
        ssize_t                         nr_segs;
        ssize_t                         size;
+       struct wait_page_async          wait;
+       struct callback_head            task_work;
 };
 
 struct io_async_ctx {
@@ -2568,6 +2570,102 @@ static int io_read_prep(struct io_kiocb *req, const 
struct io_uring_sqe *sqe,
        return 0;
 }
 
+static void io_async_buf_cancel(struct callback_head *cb)
+{
+       struct io_async_rw *rw;
+       struct io_ring_ctx *ctx;
+       struct io_kiocb *req;
+
+       rw = container_of(cb, struct io_async_rw, task_work);
+       req = rw->wait.wait.private;
+       ctx = req->ctx;
+
+       spin_lock_irq(&ctx->completion_lock);
+       io_cqring_fill_event(req, -ECANCELED);
+       io_commit_cqring(ctx);
+       spin_unlock_irq(&ctx->completion_lock);
+
+       io_cqring_ev_posted(ctx);
+       req_set_fail_links(req);
+       io_double_put_req(req);
+}
+
+static void io_async_buf_retry(struct callback_head *cb)
+{
+       struct io_async_rw *rw;
+       struct io_ring_ctx *ctx;
+       struct io_kiocb *req;
+
+       rw = container_of(cb, struct io_async_rw, task_work);
+       req = rw->wait.wait.private;
+       ctx = req->ctx;
+
+       __set_current_state(TASK_RUNNING);
+       mutex_lock(&ctx->uring_lock);
+       __io_queue_sqe(req, NULL);
+       mutex_unlock(&ctx->uring_lock);
+}
+
+static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
+                            int sync, void *arg)
+{
+       struct wait_page_async *wp;
+       struct io_kiocb *req = wait->private;
+       struct io_async_rw *rw = &req->io->rw;
+       struct wait_page_key *key = arg;
+       struct task_struct *tsk;
+       int ret;
+
+       wp = container_of(wait, struct wait_page_async, wait);
+       if (wp->key.page != key->page)
+               return 0;
+       key->page_match = 1;
+       if (wp->key.bit_nr != key->bit_nr)
+               return 0;
+       if (test_bit(PG_locked, &key->page->flags))
+               return -1;
+
+       list_del_init(&wait->entry);
+
+       init_task_work(&rw->task_work, io_async_buf_retry);
+       /* submit ref gets dropped, acquire a new one */
+       refcount_inc(&req->refs);
+       tsk = req->task;
+       ret = task_work_add(tsk, &rw->task_work, true);
+       if (unlikely(ret)) {
+               /* queue just for cancelation */
+               init_task_work(&rw->task_work, io_async_buf_cancel);
+               tsk = io_wq_get_task(req->ctx->io_wq);
+               task_work_add(tsk, &rw->task_work, true);
+       }
+       wake_up_process(tsk);
+       return 1;
+}
+
+static bool io_rw_should_retry(struct io_kiocb *req)
+{
+       struct kiocb *kiocb = &req->rw.kiocb;
+       int ret;
+
+       /* already tried, or we're doing O_DIRECT */
+       if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_WAITQ))
+               return false;
+       /*
+        * just use poll if we can, and don't attempt if the fs doesn't
+        * support callback based unlocks
+        */
+       if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC))
+               return false;
+
+       ret = kiocb_wait_page_async_init(kiocb, &req->io->rw.wait,
+                                               io_async_buf_func, req);
+       if (ret)
+               return false;
+       get_task_struct(current);
+       req->task = current;
+       return true;
+}
+
 static int io_read(struct io_kiocb *req, bool force_nonblock)
 {
        struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
@@ -2601,6 +2699,7 @@ static int io_read(struct io_kiocb *req, bool 
force_nonblock)
        if (!ret) {
                ssize_t ret2;
 
+retry:
                if (req->file->f_op->read_iter)
                        ret2 = call_read_iter(req->file, kiocb, &iter);
                else
@@ -2619,6 +2718,9 @@ static int io_read(struct io_kiocb *req, bool 
force_nonblock)
                        if (!(req->flags & REQ_F_NOWAIT) &&
                            !file_can_poll(req->file))
                                req->flags |= REQ_F_MUST_PUNT;
+                       if (io_rw_should_retry(req))
+                               goto retry;
+                       kiocb->ki_flags &= ~IOCB_WAITQ;
                        return -EAGAIN;
                }
        }
-- 
2.26.2

Reply via email to