On 9/24/19 1:06 AM, Pavel Begunkov wrote:
> On 24/09/2019 02:00, Jens Axboe wrote:
>>> I think we can do the same thing, just wrapping the waitqueue in a
>>> structure with a count in it, on the stack. Got some flight time
>>> coming up later today, let me try and cook up a patch.
>>
>> Totally untested, and sent out 5 min before departure... But something
>> like this.
> Hmm, reminds me my first version. Basically that's the same thing but
> with macroses inlined. I wanted to make it reusable and self-contained,
> though.
> 
> If you don't think it could be useful in other places, sure, we could do
> something like that. Is that so?

I totally agree it could be useful in other places. Maybe formalized and
used with wake_up_nr() instead of adding a new primitive? Haven't looked
into that, I may be talking nonsense.

In any case, I did get a chance to test it and it works for me. Here's
the "finished" version, slightly cleaned up and with a comment added
for good measure.


diff --git a/fs/io_uring.c b/fs/io_uring.c
index ca7570aca430..14fae454cf75 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -2768,6 +2768,42 @@ static int io_ring_submit(struct io_ring_ctx *ctx, 
unsigned int to_submit,
        return submit;
 }
 
+struct io_wait_queue {
+       struct wait_queue_entry wq;
+       struct io_ring_ctx *ctx;
+       struct task_struct *task;
+       unsigned to_wait;
+       unsigned nr_timeouts;
+};
+
+static inline bool io_should_wake(struct io_wait_queue *iowq)
+{
+       struct io_ring_ctx *ctx = iowq->ctx;
+
+       /*
+        * Wake up if we have enough events, or if a timeout occured since we
+        * started waiting. For timeouts, we always want to return to userspace,
+        * regardless of event count.
+        */
+       return io_cqring_events(ctx->rings) >= iowq->to_wait ||
+                       atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
+}
+
+static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
+                           int wake_flags, void *key)
+{
+       struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
+                                                       wq);
+
+       if (io_should_wake(iowq)) {
+               list_del_init(&curr->entry);
+               wake_up_process(iowq->task);
+               return 1;
+       }
+
+       return -1;
+}
+
 /*
  * Wait until events become available, if we don't already have some. The
  * application must reap them itself, as they reside on the shared cq ring.
@@ -2775,8 +2811,16 @@ static int io_ring_submit(struct io_ring_ctx *ctx, 
unsigned int to_submit,
 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
                          const sigset_t __user *sig, size_t sigsz)
 {
+       struct io_wait_queue iowq = {
+               .wq = {
+                       .func   = io_wake_function,
+                       .entry  = LIST_HEAD_INIT(iowq.wq.entry),
+               },
+               .task           = current,
+               .ctx            = ctx,
+               .to_wait        = min_events,
+       };
        struct io_rings *rings = ctx->rings;
-       unsigned nr_timeouts;
        int ret;
 
        if (io_cqring_events(rings) >= min_events)
@@ -2795,15 +2839,16 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int 
min_events,
                        return ret;
        }
 
-       nr_timeouts = atomic_read(&ctx->cq_timeouts);
-       /*
-        * Return if we have enough events, or if a timeout occured since
-        * we started waiting. For timeouts, we always want to return to
-        * userspace.
-        */
-       ret = wait_event_interruptible(ctx->wait,
-                               io_cqring_events(rings) >= min_events ||
-                               atomic_read(&ctx->cq_timeouts) != nr_timeouts);
+       iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
+       prepare_to_wait_exclusive(&ctx->wait, &iowq.wq, TASK_INTERRUPTIBLE);
+       do {
+               if (io_should_wake(&iowq))
+                       break;
+               schedule();
+               set_current_state(TASK_INTERRUPTIBLE);
+       } while (1);
+       finish_wait(&ctx->wait, &iowq.wq);
+
        restore_saved_sigmask_unless(ret == -ERESTARTSYS);
        if (ret == -ERESTARTSYS)
                ret = -EINTR;

-- 
Jens Axboe

Reply via email to