The Friday 05 Sep 2014 à 00:27:07 (+0800), Ming Lei wrote : > In the enqueue path, we can't complete request, otherwise > "Co-routine re-entered recursively" may be caused, so this > patch fixes the issue with the following ideas: > > - for -EAGAIN or partial completion, retry the submission by > scheduling a BH in following completion cb > - for part of completion, also update the io queue > - for other failure, return the failure if in enqueue path, > otherwise, abort all queued I/O > > Signed-off-by: Ming Lei <ming....@canonical.com> > --- > block/linux-aio.c | 106 > ++++++++++++++++++++++++++++++++++++++++------------- > 1 file changed, 81 insertions(+), 25 deletions(-) > > diff --git a/block/linux-aio.c b/block/linux-aio.c > index 9aca758..a06576d 100644 > --- a/block/linux-aio.c > +++ b/block/linux-aio.c > @@ -38,11 +38,19 @@ struct qemu_laiocb { > QLIST_ENTRY(qemu_laiocb) node; > }; > > -typedef struct { > +/* > + * TODO: support to batch I/O from multiple bs in one same > + * AIO context, one important use case is multi-lun scsi, > + * so in future the IO queue should be per AIO context. > + */ > +typedef struct LaioQueue { > struct iocb *iocbs[MAX_QUEUED_IO]; > int plugged; > - unsigned int size; > - unsigned int idx; > + uint32 size; > + uint32 idx;
Sorry Ming I said crap about struct, size and idx. I initially misread that you where adding this. You where right from the start. > + > + /* handle -EAGAIN and partial completion */ > + QEMUBH *retry; > } LaioQueue; > > struct qemu_laio_state { > @@ -138,6 +146,13 @@ static void qemu_laio_completion_bh(void *opaque) > } > } > > +static void qemu_laio_start_retry(struct qemu_laio_state *s) > +{ > + if (s->io_q.idx) { > + qemu_bh_schedule(s->io_q.retry); > + } > +} > + > static void qemu_laio_completion_cb(EventNotifier *e) > { > struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e); > @@ -145,6 +160,7 @@ static void qemu_laio_completion_cb(EventNotifier *e) > if (event_notifier_test_and_clear(&s->e)) { > qemu_bh_schedule(s->completion_bh); > } > + qemu_laio_start_retry(s); > } > > static void laio_cancel(BlockDriverAIOCB *blockacb) > @@ -164,6 +180,7 @@ static void laio_cancel(BlockDriverAIOCB *blockacb) > ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event); > if (ret == 0) { > laiocb->ret = -ECANCELED; > + qemu_laio_start_retry(laiocb->ctx); > return; > } > > @@ -191,45 +208,80 @@ static void ioq_init(LaioQueue *io_q) > io_q->plugged = 0; > } > > -static int ioq_submit(struct qemu_laio_state *s) > +static void abort_queue(struct qemu_laio_state *s) > +{ > + int i; > + for (i = 0; i < s->io_q.idx; i++) { > + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i], > + struct qemu_laiocb, > + iocb); > + laiocb->ret = -EIO; > + qemu_laio_process_completion(s, laiocb); > + } > +} > + > +static int ioq_submit(struct qemu_laio_state *s, bool enqueue) > { > int ret, i = 0; > int len = s->io_q.idx; > + int j = 0; > > - do { > - ret = io_submit(s->ctx, len, s->io_q.iocbs); > - } while (i++ < 3 && ret == -EAGAIN); > + if (!len) { > + return 0; > + } > > - /* empty io queue */ > - s->io_q.idx = 0; > + ret = io_submit(s->ctx, len, s->io_q.iocbs); > + if (ret == -EAGAIN) { /* retry in following completion cb */ > + return 0; > + } else if (ret < 0) { > + if (enqueue) { > + return ret; > + } > > - if (ret < 0) { > - i = 0; > - } else { > - i = ret; > + /* in non-queue path, all IOs have to be completed */ > + abort_queue(s); > + ret = len; > + } else if (ret == 0) { > + goto out; > } > > - for (; i < len; i++) { > - struct qemu_laiocb *laiocb = > - container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb); > - > - laiocb->ret = (ret < 0) ? ret : -EIO; > - qemu_laio_process_completion(s, laiocb); > + for (i = ret; i < len; i++) { > + s->io_q.iocbs[j++] = s->io_q.iocbs[i]; > } > + > + out: > + /* > + * update io queue, for partial completion, retry will be > + * started automatically in following completion cb. > + */ > + s->io_q.idx -= ret; > + > return ret; > } > > -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) > +static void ioq_submit_retry(void *opaque) > +{ > + struct qemu_laio_state *s = opaque; > + ioq_submit(s, false); > +} > + > +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) > { > unsigned int idx = s->io_q.idx; > > + if (unlikely(idx == s->io_q.size)) { > + return -1; > + } > + > s->io_q.iocbs[idx++] = iocb; > s->io_q.idx = idx; > > - /* submit immediately if queue is full */ > - if (idx == s->io_q.size) { > - ioq_submit(s); > + /* submit immediately if queue depth is above 2/3 */ > + if (idx > s->io_q.size * 2 / 3) { > + return ioq_submit(s, true); > } > + > + return 0; > } > > void laio_io_plug(BlockDriverState *bs, void *aio_ctx) > @@ -251,7 +303,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, > bool unplug) > } > > if (s->io_q.idx > 0) { > - ret = ioq_submit(s); > + ret = ioq_submit(s, false); > } > > return ret; > @@ -295,7 +347,9 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void > *aio_ctx, int fd, > goto out_free_aiocb; > } > } else { > - ioq_enqueue(s, iocbs); > + if (ioq_enqueue(s, iocbs) < 0) { > + goto out_free_aiocb; > + } > } > return &laiocb->common; > > @@ -310,12 +364,14 @@ void laio_detach_aio_context(void *s_, AioContext > *old_context) > > aio_set_event_notifier(old_context, &s->e, NULL); > qemu_bh_delete(s->completion_bh); > + qemu_bh_delete(s->io_q.retry); > } > > void laio_attach_aio_context(void *s_, AioContext *new_context) > { > struct qemu_laio_state *s = s_; > > + s->io_q.retry = aio_bh_new(new_context, ioq_submit_retry, s); > s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s); > aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb); > } > -- > 1.7.9.5 > >