On Thu, Sep 4, 2014 at 10:59 PM, Benoît Canet <benoit.ca...@irqsave.net> wrote: > The Thursday 14 Aug 2014 à 17:41:41 (+0800), Ming Lei wrote : >> In the enqueue path, we can't complete request, otherwise >> "Co-routine re-entered recursively" may be caused, so this >> patch fixes the issue with below ideas: > > s/with below ideas/with the following ideas/g > >> >> - for -EAGAIN or partial completion, retry the submision by > > s/submision/submission/ > >> schedule an BH in following completion cb > > s/schedule an/sheduling a/ > >> - for part of completion, also update the io queue >> - for other failure, return the failure if in enqueue path, >> otherwise, abort all queued I/O >> >> Signed-off-by: Ming Lei <ming....@canonical.com> >> --- >> block/linux-aio.c | 99 >> +++++++++++++++++++++++++++++++++++++++++------------ >> 1 file changed, 77 insertions(+), 22 deletions(-) >> >> diff --git a/block/linux-aio.c b/block/linux-aio.c >> index 7ac7e8c..4cdf507 100644 >> --- a/block/linux-aio.c >> +++ b/block/linux-aio.c >> @@ -38,11 +38,19 @@ struct qemu_laiocb { >> QLIST_ENTRY(qemu_laiocb) node; >> }; >> >> +/* >> + * TODO: support to batch I/O from multiple bs in one same >> + * AIO context, one important use case is multi-lun scsi, >> + * so in future the IO queue should be per AIO context. >> + */ >> typedef struct { > > In QEMU we typically write the name twice in these kind of declarations: > > typedef struct LaioQueue { > ... stuff ... > } LaioQueue; > >> struct iocb *iocbs[MAX_QUEUED_IO]; >> int plugged; > > Are plugged values either 0 and 1 ? > If so it should be "bool plugged;"
It is actually a reference counter. > >> unsigned int size; >> unsigned int idx; > > See: > > benoit@Laure:~/code/qemu$ git grep "unsigned int"|wc > 2283 14038 154201 > benoit@Laure:~/code/qemu$ git grep "uint32"|wc > 12535 63129 810822 > > Maybe you could use the most popular type. > >> + >> + /* handle -EAGAIN and partial completion */ >> + QEMUBH *retry; >> } LaioQueue; >> >> struct qemu_laio_state { >> @@ -86,6 +94,12 @@ static void qemu_laio_process_completion(struct >> qemu_laio_state *s, >> qemu_aio_release(laiocb); >> } >> >> +static void qemu_laio_start_retry(struct qemu_laio_state *s) >> +{ > > >> + if (s->io_q.idx) >> + qemu_bh_schedule(s->io_q.retry); > > In QEMU this test is writen like this: > > if (s->io_q.idx) { > qemu_bh_schedule(s->io_q.retry); > } > > I suggest you ran ./scripts/checkpatch.pl on your series before submitting it. I don't know why the blow pre-commit hook didn't complain that: [tom@qemu]$cat .git/hooks/pre-commit #!/bin/bash exec git diff --cached | scripts/checkpatch.pl --no-signoff -q - > > >> +} >> + >> static void qemu_laio_completion_cb(EventNotifier *e) >> { >> struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e); >> @@ -108,6 +122,7 @@ static void qemu_laio_completion_cb(EventNotifier *e) >> qemu_laio_process_completion(s, laiocb); >> } >> } >> + qemu_laio_start_retry(s); >> } >> >> static void laio_cancel(BlockDriverAIOCB *blockacb) >> @@ -127,6 +142,7 @@ static void laio_cancel(BlockDriverAIOCB *blockacb) >> ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event); >> if (ret == 0) { >> laiocb->ret = -ECANCELED; >> + qemu_laio_start_retry(laiocb->ctx); >> return; >> } >> >> @@ -154,45 +170,80 @@ static void ioq_init(LaioQueue *io_q) >> io_q->plugged = 0; >> } >> >> -static int ioq_submit(struct qemu_laio_state *s) >> +static void abort_queue(struct qemu_laio_state *s) >> +{ >> + int i; >> + for (i = 0; i < s->io_q.idx; i++) { >> + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i], >> + struct qemu_laiocb, >> + iocb); >> + laiocb->ret = -EIO; >> + qemu_laio_process_completion(s, laiocb); >> + } >> +} >> + >> +static int ioq_submit(struct qemu_laio_state *s, bool enqueue) >> { >> int ret, i = 0; >> int len = s->io_q.idx; >> + int j = 0; >> >> - do { >> - ret = io_submit(s->ctx, len, s->io_q.iocbs); >> - } while (i++ < 3 && ret == -EAGAIN); >> + if (!len) { >> + return 0; >> + } >> + >> + ret = io_submit(s->ctx, len, s->io_q.iocbs); >> + if (ret == -EAGAIN) { /* retry in following completion cb */ >> + return 0; >> + } else if (ret < 0) { >> + if (enqueue) { >> + return ret; >> + } >> >> - /* empty io queue */ >> - s->io_q.idx = 0; >> + /* in non-queue path, all IOs have to be completed */ >> + abort_queue(s); >> + ret = len; >> + } else if (ret == 0) { >> + goto out; >> + } >> >> - if (ret < 0) { >> - i = 0; >> - } else { >> - i = ret; >> + for (i = ret; i < len; i++) { >> + s->io_q.iocbs[j++] = s->io_q.iocbs[i]; >> } >> >> - for (; i < len; i++) { >> - struct qemu_laiocb *laiocb = >> - container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb); >> + out: >> + /* >> + * update io queue, for partial completion, retry will be >> + * started automatically in following completion cb. >> + */ >> + s->io_q.idx -= ret; >> >> - laiocb->ret = (ret < 0) ? ret : -EIO; >> - qemu_laio_process_completion(s, laiocb); >> - } >> return ret; >> } >> >> -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) >> +static void ioq_submit_retry(void *opaque) >> +{ >> + struct qemu_laio_state *s = opaque; >> + ioq_submit(s, false); >> +} >> + >> +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) >> { >> unsigned int idx = s->io_q.idx; >> >> + if (unlikely(idx == s->io_q.size)) { >> + return -1; >> + } >> + >> s->io_q.iocbs[idx++] = iocb; >> s->io_q.idx = idx; >> >> - /* submit immediately if queue is full */ >> - if (idx == s->io_q.size) { >> - ioq_submit(s); >> + /* submit immediately if queue depth is above 2/3 */ >> + if (idx > s->io_q.size * 2 / 3) { >> + return ioq_submit(s, true); >> } >> + >> + return 0; >> } >> >> void laio_io_plug(BlockDriverState *bs, void *aio_ctx) >> @@ -214,7 +265,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, >> bool unplug) >> } >> >> if (s->io_q.idx > 0) { >> - ret = ioq_submit(s); >> + ret = ioq_submit(s, false); >> } >> >> return ret; >> @@ -258,7 +309,9 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void >> *aio_ctx, int fd, >> goto out_free_aiocb; >> } >> } else { >> - ioq_enqueue(s, iocbs); >> + if (ioq_enqueue(s, iocbs) < 0) { >> + goto out_free_aiocb; >> + } >> } >> return &laiocb->common; >> >> @@ -272,6 +325,7 @@ void laio_detach_aio_context(void *s_, AioContext >> *old_context) >> struct qemu_laio_state *s = s_; >> >> aio_set_event_notifier(old_context, &s->e, NULL); >> + qemu_bh_delete(s->io_q.retry); >> } >> >> void laio_attach_aio_context(void *s_, AioContext *new_context) >> @@ -279,6 +333,7 @@ void laio_attach_aio_context(void *s_, AioContext >> *new_context) >> struct qemu_laio_state *s = s_; >> >> aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb); >> + s->io_q.retry = aio_bh_new(new_context, ioq_submit_retry, s); >> } >> >> void *laio_init(void) >> -- >> 1.7.9.5 >> >>