In the enqueue path, we can't complete request, otherwise "Co-routine re-entered recursively" may be caused, so this patch fixes the issue with below ideas:
- for -EAGAIN, retry the submission in an introduced event handler - for part of completion, just update the io queue, since it is moving on after all - for other failure, return the failure if in enqueue path, otherwise, abort all queued I/O Signed-off-by: Ming Lei <ming....@canonical.com> --- block/linux-aio.c | 85 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 63 insertions(+), 22 deletions(-) diff --git a/block/linux-aio.c b/block/linux-aio.c index 4867369..6f7dd51 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -51,6 +51,7 @@ struct qemu_laio_state { /* io queue for submit at batch */ LaioQueue io_q; + EventNotifier retry; /* handle -EAGAIN */ }; static inline ssize_t io_event_ret(struct io_event *ev) @@ -154,45 +155,78 @@ static void ioq_init(LaioQueue *io_q) io_q->plugged = 0; } -static int ioq_submit(struct qemu_laio_state *s) +static void abort_queue(struct qemu_laio_state *s) +{ + int i; + for (i = 0; i < s->io_q.idx; i++) { + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i], + struct qemu_laiocb, + iocb); + laiocb->ret = -EIO; + qemu_laio_process_completion(s, laiocb); + } +} + +static int ioq_submit(struct qemu_laio_state *s, bool enqueue) { int ret, i = 0; int len = s->io_q.idx; + int j = 0; - do { - ret = io_submit(s->ctx, len, s->io_q.iocbs); - } while (i++ < 3 && ret == -EAGAIN); + if (!len) { + return 0; + } - /* empty io queue */ - s->io_q.idx = 0; + ret = io_submit(s->ctx, len, s->io_q.iocbs); + if (ret == -EAGAIN) { + event_notifier_set(&s->retry); + return 0; + } else if (ret < 0) { + if (enqueue) + return ret; + + /* in non-queue path, all IOs have to be completed */ + abort_queue(s); + ret = len; + } else if (ret == 0) { + goto out; + } - if (ret < 0) { - i = 0; - } else { - i = ret; + for (i = ret; i < len; i++) { + s->io_q.iocbs[j++] = s->io_q.iocbs[i]; } - for (; i < len; i++) { - struct qemu_laiocb *laiocb = - container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb); + out: + /* update io queue */ + s->io_q.idx -= ret; - laiocb->ret = (ret < 0) ? ret : -EIO; - qemu_laio_process_completion(s, laiocb); - } return ret; } -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) +static void ioq_submit_retry(EventNotifier *e) +{ + struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, retry); + + event_notifier_test_and_clear(e); + ioq_submit(s, false); +} + +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) { unsigned int idx = s->io_q.idx; + if (unlikely(idx == s->io_q.size)) + return -1; + s->io_q.iocbs[idx++] = iocb; s->io_q.idx = idx; - /* submit immediately if queue is full */ - if (idx == s->io_q.size) { - ioq_submit(s); + /* submit immediately if queue depth is above 2/3 */ + if (idx > s->io_q.size * 2 / 3) { + return ioq_submit(s, true); } + + return 0; } void laio_io_plug(BlockDriverState *bs, void *aio_ctx) @@ -214,7 +248,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug) } if (s->io_q.idx > 0) { - ret = ioq_submit(s); + ret = ioq_submit(s, false); } return ret; @@ -258,7 +292,8 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd, goto out_free_aiocb; } } else { - ioq_enqueue(s, iocbs); + if (ioq_enqueue(s, iocbs) < 0) + goto out_free_aiocb; } return &laiocb->common; @@ -272,6 +307,7 @@ void laio_detach_aio_context(void *s_, AioContext *old_context) struct qemu_laio_state *s = s_; aio_set_event_notifier(old_context, &s->e, NULL); + aio_set_event_notifier(old_context, &s->retry, NULL); } void laio_attach_aio_context(void *s_, AioContext *new_context) @@ -279,6 +315,7 @@ void laio_attach_aio_context(void *s_, AioContext *new_context) struct qemu_laio_state *s = s_; aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb); + aio_set_event_notifier(new_context, &s->retry, ioq_submit_retry); } void *laio_init(void) @@ -295,6 +332,9 @@ void *laio_init(void) } ioq_init(&s->io_q); + if (event_notifier_init(&s->retry, false) < 0) { + goto out_close_efd; + } return s; @@ -310,5 +350,6 @@ void laio_cleanup(void *s_) struct qemu_laio_state *s = s_; event_notifier_cleanup(&s->e); + event_notifier_cleanup(&s->retry); g_free(s); } -- 1.7.9.5