In the enqueue path, we can't complete request, otherwise "Co-routine re-entered recursively" may be caused, so this patch fixes the issue with below ideas:
- for -EAGAIN or partial completion, retry the submision by schedule an BH in following completion cb - for part of completion, also update the io queue - for other failure, return the failure if in enqueue path, otherwise, abort all queued I/O Signed-off-by: Ming Lei <ming....@canonical.com> --- block/linux-aio.c | 99 +++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 77 insertions(+), 22 deletions(-) diff --git a/block/linux-aio.c b/block/linux-aio.c index 7ac7e8c..4cdf507 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -38,11 +38,19 @@ struct qemu_laiocb { QLIST_ENTRY(qemu_laiocb) node; }; +/* + * TODO: support to batch I/O from multiple bs in one same + * AIO context, one important use case is multi-lun scsi, + * so in future the IO queue should be per AIO context. + */ typedef struct { struct iocb *iocbs[MAX_QUEUED_IO]; int plugged; unsigned int size; unsigned int idx; + + /* handle -EAGAIN and partial completion */ + QEMUBH *retry; } LaioQueue; struct qemu_laio_state { @@ -86,6 +94,12 @@ static void qemu_laio_process_completion(struct qemu_laio_state *s, qemu_aio_release(laiocb); } +static void qemu_laio_start_retry(struct qemu_laio_state *s) +{ + if (s->io_q.idx) + qemu_bh_schedule(s->io_q.retry); +} + static void qemu_laio_completion_cb(EventNotifier *e) { struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e); @@ -108,6 +122,7 @@ static void qemu_laio_completion_cb(EventNotifier *e) qemu_laio_process_completion(s, laiocb); } } + qemu_laio_start_retry(s); } static void laio_cancel(BlockDriverAIOCB *blockacb) @@ -127,6 +142,7 @@ static void laio_cancel(BlockDriverAIOCB *blockacb) ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event); if (ret == 0) { laiocb->ret = -ECANCELED; + qemu_laio_start_retry(laiocb->ctx); return; } @@ -154,45 +170,80 @@ static void ioq_init(LaioQueue *io_q) io_q->plugged = 0; } -static int ioq_submit(struct qemu_laio_state *s) +static void abort_queue(struct qemu_laio_state *s) +{ + int i; + for (i = 0; i < s->io_q.idx; i++) { + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i], + struct qemu_laiocb, + iocb); + laiocb->ret = -EIO; + qemu_laio_process_completion(s, laiocb); + } +} + +static int ioq_submit(struct qemu_laio_state *s, bool enqueue) { int ret, i = 0; int len = s->io_q.idx; + int j = 0; - do { - ret = io_submit(s->ctx, len, s->io_q.iocbs); - } while (i++ < 3 && ret == -EAGAIN); + if (!len) { + return 0; + } + + ret = io_submit(s->ctx, len, s->io_q.iocbs); + if (ret == -EAGAIN) { /* retry in following completion cb */ + return 0; + } else if (ret < 0) { + if (enqueue) { + return ret; + } - /* empty io queue */ - s->io_q.idx = 0; + /* in non-queue path, all IOs have to be completed */ + abort_queue(s); + ret = len; + } else if (ret == 0) { + goto out; + } - if (ret < 0) { - i = 0; - } else { - i = ret; + for (i = ret; i < len; i++) { + s->io_q.iocbs[j++] = s->io_q.iocbs[i]; } - for (; i < len; i++) { - struct qemu_laiocb *laiocb = - container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb); + out: + /* + * update io queue, for partial completion, retry will be + * started automatically in following completion cb. + */ + s->io_q.idx -= ret; - laiocb->ret = (ret < 0) ? ret : -EIO; - qemu_laio_process_completion(s, laiocb); - } return ret; } -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) +static void ioq_submit_retry(void *opaque) +{ + struct qemu_laio_state *s = opaque; + ioq_submit(s, false); +} + +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) { unsigned int idx = s->io_q.idx; + if (unlikely(idx == s->io_q.size)) { + return -1; + } + s->io_q.iocbs[idx++] = iocb; s->io_q.idx = idx; - /* submit immediately if queue is full */ - if (idx == s->io_q.size) { - ioq_submit(s); + /* submit immediately if queue depth is above 2/3 */ + if (idx > s->io_q.size * 2 / 3) { + return ioq_submit(s, true); } + + return 0; } void laio_io_plug(BlockDriverState *bs, void *aio_ctx) @@ -214,7 +265,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug) } if (s->io_q.idx > 0) { - ret = ioq_submit(s); + ret = ioq_submit(s, false); } return ret; @@ -258,7 +309,9 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd, goto out_free_aiocb; } } else { - ioq_enqueue(s, iocbs); + if (ioq_enqueue(s, iocbs) < 0) { + goto out_free_aiocb; + } } return &laiocb->common; @@ -272,6 +325,7 @@ void laio_detach_aio_context(void *s_, AioContext *old_context) struct qemu_laio_state *s = s_; aio_set_event_notifier(old_context, &s->e, NULL); + qemu_bh_delete(s->io_q.retry); } void laio_attach_aio_context(void *s_, AioContext *new_context) @@ -279,6 +333,7 @@ void laio_attach_aio_context(void *s_, AioContext *new_context) struct qemu_laio_state *s = s_; aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb); + s->io_q.retry = aio_bh_new(new_context, ioq_submit_retry, s); } void *laio_init(void) -- 1.7.9.5