In the submit path, we can't complete request directly, otherwise "Co-routine re-entered recursively" may be caused, so this patch fixes the issue with below ideas:
- for -EAGAIN or partial submission, retry the submision in following completion cb which is run in BH context - for part of submission, update the io queue too - for case of io queue full, submit queued requests immediatelly and return failure to caller - for other failure, abort all queued requests in BH context, and requests won't be allow to submit until aborting is handled Reviewed-by: Paolo Bonzini <pbonz...@redhat.com> Signed-off-by: Ming Lei <ming....@canonical.com> --- block/linux-aio.c | 116 ++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 97 insertions(+), 19 deletions(-) diff --git a/block/linux-aio.c b/block/linux-aio.c index d92513b..53c5616 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -38,11 +38,21 @@ struct qemu_laiocb { QLIST_ENTRY(qemu_laiocb) node; }; +/* + * TODO: support to batch I/O from multiple bs in one same + * AIO context, one important use case is multi-lun scsi, + * so in future the IO queue should be per AIO context. + */ typedef struct { struct iocb *iocbs[MAX_QUEUED_IO]; int plugged; unsigned int size; unsigned int idx; + + /* abort queued requests in BH context */ + QEMUBH *abort_bh; + bool aborting; + int abort_ret; } LaioQueue; struct qemu_laio_state { @@ -59,6 +69,8 @@ struct qemu_laio_state { int event_max; }; +static int ioq_submit(struct qemu_laio_state *s); + static inline ssize_t io_event_ret(struct io_event *ev) { return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res); @@ -135,6 +147,11 @@ static void qemu_laio_completion_bh(void *opaque) qemu_laio_process_completion(s, laiocb); } + + /* Handle -EAGAIN or partial submission */ + if (s->io_q.idx) { + ioq_submit(s); + } } static void qemu_laio_completion_cb(EventNotifier *e) @@ -175,47 +192,100 @@ static void ioq_init(LaioQueue *io_q) io_q->size = MAX_QUEUED_IO; io_q->idx = 0; io_q->plugged = 0; + io_q->aborting = false; } +/* Always return >= 0 and it means how many requests are submitted */ static int ioq_submit(struct qemu_laio_state *s) { - int ret, i = 0; + int ret; int len = s->io_q.idx; - do { - ret = io_submit(s->ctx, len, s->io_q.iocbs); - } while (i++ < 3 && ret == -EAGAIN); - - /* empty io queue */ - s->io_q.idx = 0; + if (!len) { + return 0; + } + ret = io_submit(s->ctx, len, s->io_q.iocbs); if (ret < 0) { - i = 0; - } else { - i = ret; + /* retry in following completion cb */ + if (ret == -EAGAIN) { + return 0; + } + + /* + * Abort in BH context for avoiding Co-routine re-entered, + * and update io queue at that time + */ + s->io_q.aborting = true; + s->io_q.abort_ret = ret; + qemu_bh_schedule(s->io_q.abort_bh); + ret = 0; } - for (; i < len; i++) { - struct qemu_laiocb *laiocb = - container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb); + /* + * update io queue, and retry will be started automatically + * in following completion cb for the remainder + */ + if (ret > 0) { + if (ret < len) { + memmove(&s->io_q.iocbs[0], &s->io_q.iocbs[ret], + (len - ret) * sizeof(struct iocb *)); + } + s->io_q.idx -= ret; + } + + return ret; +} - laiocb->ret = (ret < 0) ? ret : -EIO; +static void ioq_abort_bh(void *opaque) +{ + struct qemu_laio_state *s = opaque; + int i; + + for (i = 0; i < s->io_q.idx; i++) { + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i], + struct qemu_laiocb, + iocb); + laiocb->ret = s->io_q.abort_ret; qemu_laio_process_completion(s, laiocb); } - return ret; + + s->io_q.idx = 0; + s->io_q.aborting = false; } -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) { unsigned int idx = s->io_q.idx; + /* Request can't be allowed to submit until aborting is handled */ + if (unlikely(s->io_q.aborting)) { + return -EIO; + } + + if (unlikely(idx == s->io_q.size)) { + ioq_submit(s); + + if (unlikely(s->io_q.aborting)) { + return -EIO; + } + idx = s->io_q.idx; + } + + /* It has to return now if queue is still full */ + if (unlikely(idx == s->io_q.size)) { + return -EAGAIN; + } + s->io_q.iocbs[idx++] = iocb; s->io_q.idx = idx; - /* submit immediately if queue is full */ - if (idx == s->io_q.size) { + /* submit immediately if queue depth is above 2/3 */ + if (idx > s->io_q.size * 2 / 3) { ioq_submit(s); } + + return 0; } void laio_io_plug(BlockDriverState *bs, void *aio_ctx) @@ -281,7 +351,9 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd, goto out_free_aiocb; } } else { - ioq_enqueue(s, iocbs); + if (ioq_enqueue(s, iocbs) < 0) { + goto out_free_aiocb; + } } return &laiocb->common; @@ -296,14 +368,20 @@ void laio_detach_aio_context(void *s_, AioContext *old_context) aio_set_event_notifier(old_context, &s->e, NULL); qemu_bh_delete(s->completion_bh); + qemu_bh_delete(s->io_q.abort_bh); } void laio_attach_aio_context(void *s_, AioContext *new_context) { struct qemu_laio_state *s = s_; + s->io_q.abort_bh = aio_bh_new(new_context, ioq_abort_bh, s); s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s); aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb); + + if (s->io_q.aborting) { + qemu_bh_schedule(s->io_q.abort_bh); + } } void *laio_init(void) -- 1.7.9.5