In the enqueue path, we can't complete request, otherwise "Co-routine re-entered recursively" may be caused, so this patch fixes the issue with below ideas:
- for -EAGAIN or partial completion, retry the submission by an introduced event handler - for part of completion, also update the io queue - for other failure, return the failure if in enqueue path, otherwise, abort all queued I/O Signed-off-by: Ming Lei <ming....@canonical.com> --- block/linux-aio.c | 90 ++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 68 insertions(+), 22 deletions(-) diff --git a/block/linux-aio.c b/block/linux-aio.c index 7ac7e8c..5eb9c92 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -51,6 +51,7 @@ struct qemu_laio_state { /* io queue for submit at batch */ LaioQueue io_q; + EventNotifier retry; /* handle -EAGAIN and partial completion */ }; static inline ssize_t io_event_ret(struct io_event *ev) @@ -154,45 +155,80 @@ static void ioq_init(LaioQueue *io_q) io_q->plugged = 0; } -static int ioq_submit(struct qemu_laio_state *s) +static void abort_queue(struct qemu_laio_state *s) +{ + int i; + for (i = 0; i < s->io_q.idx; i++) { + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i], + struct qemu_laiocb, + iocb); + laiocb->ret = -EIO; + qemu_laio_process_completion(s, laiocb); + } +} + +static int ioq_submit(struct qemu_laio_state *s, bool enqueue) { int ret, i = 0; int len = s->io_q.idx; + int j = 0; - do { - ret = io_submit(s->ctx, len, s->io_q.iocbs); - } while (i++ < 3 && ret == -EAGAIN); + if (!len) { + return 0; + } - /* empty io queue */ - s->io_q.idx = 0; + ret = io_submit(s->ctx, len, s->io_q.iocbs); + if (ret == -EAGAIN) { + event_notifier_set(&s->retry); + return 0; + } else if (ret < 0) { + if (enqueue) { + return ret; + } - if (ret < 0) { - i = 0; - } else { - i = ret; + /* in non-queue path, all IOs have to be completed */ + abort_queue(s); + ret = len; + } else if (ret == 0) { + goto out; } - for (; i < len; i++) { - struct qemu_laiocb *laiocb = - container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb); - - laiocb->ret = (ret < 0) ? ret : -EIO; - qemu_laio_process_completion(s, laiocb); + for (i = ret; i < len; i++) { + s->io_q.iocbs[j++] = s->io_q.iocbs[i]; } + + out: + /* update io queue */ + s->io_q.idx -= ret; + return ret; } -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) +static void ioq_submit_retry(EventNotifier *e) +{ + struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, retry); + + event_notifier_test_and_clear(e); + ioq_submit(s, false); +} + +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) { unsigned int idx = s->io_q.idx; + if (unlikely(idx == s->io_q.size)) { + return -1; + } + s->io_q.iocbs[idx++] = iocb; s->io_q.idx = idx; - /* submit immediately if queue is full */ - if (idx == s->io_q.size) { - ioq_submit(s); + /* submit immediately if queue depth is above 2/3 */ + if (idx > s->io_q.size * 2 / 3) { + return ioq_submit(s, true); } + + return 0; } void laio_io_plug(BlockDriverState *bs, void *aio_ctx) @@ -214,7 +250,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug) } if (s->io_q.idx > 0) { - ret = ioq_submit(s); + ret = ioq_submit(s, false); } return ret; @@ -258,7 +294,9 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd, goto out_free_aiocb; } } else { - ioq_enqueue(s, iocbs); + if (ioq_enqueue(s, iocbs) < 0) { + goto out_free_aiocb; + } } return &laiocb->common; @@ -272,6 +310,7 @@ void laio_detach_aio_context(void *s_, AioContext *old_context) struct qemu_laio_state *s = s_; aio_set_event_notifier(old_context, &s->e, NULL); + aio_set_event_notifier(old_context, &s->retry, NULL); } void laio_attach_aio_context(void *s_, AioContext *new_context) @@ -279,6 +318,7 @@ void laio_attach_aio_context(void *s_, AioContext *new_context) struct qemu_laio_state *s = s_; aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb); + aio_set_event_notifier(new_context, &s->retry, ioq_submit_retry); } void *laio_init(void) @@ -295,9 +335,14 @@ void *laio_init(void) } ioq_init(&s->io_q); + if (event_notifier_init(&s->retry, false) < 0) { + goto out_notifer_init; + } return s; +out_notifer_init: + io_destroy(s->ctx); out_close_efd: event_notifier_cleanup(&s->e); out_free_state: @@ -310,6 +355,7 @@ void laio_cleanup(void *s_) struct qemu_laio_state *s = s_; event_notifier_cleanup(&s->e); + event_notifier_cleanup(&s->retry); if (io_destroy(s->ctx) != 0) { fprintf(stderr, "%s: destroy AIO context %p failed\n", -- 1.7.9.5