On 12/07/2016 17:25, Roman Pen wrote: > AIO context in userspace is represented as a simple ring buffer, which > can be consumed directly without entering the kernel, which obviously > can bring some performance gain. QEMU does not use timeout value for > waiting for events completions, so we can consume all events from > userspace. > > My VM configuration is the following: > > VCPU=8, MQ=8, 1 iothread > > FIO results: > > io_getevents in kernel: > > READ: io=56479MB, aggrb=1882.5MB/s, minb=1882.5MB/s, maxb=1882.5MB/s, > mint=30003msec, maxt=30003msec > WRITE: io=56371MB, aggrb=1878.9MB/s, minb=1878.9MB/s, maxb=1878.9MB/s, > mint=30003msec, maxt=30003msec > > io_getevents in userspace: > > READ: io=57576MB, aggrb=1919.2MB/s, minb=1919.2MB/s, maxb=1919.2MB/s, > mint=30003msec, maxt=30003msec > WRITE: io=57492MB, aggrb=1916.3MB/s, minb=1916.3MB/s, maxb=1916.3MB/s, > mint=30003msec, maxt=30003msec > > The gain is only ~2%, but the price is almost nothing. > > Signed-off-by: Roman Pen <roman.peny...@profitbricks.com> > Cc: Stefan Hajnoczi <stefa...@redhat.com> > Cc: Paolo Bonzini <pbonz...@redhat.com> > Cc: qemu-devel@nongnu.org > --- > block/linux-aio.c | 86 > ++++++++++++++++++++++++++++++++++++++++++++----------- > 1 file changed, 70 insertions(+), 16 deletions(-)
The patch does not pass checkpatch.pl, but apart from this it's a nice trick. Paolo > diff --git a/block/linux-aio.c b/block/linux-aio.c > index fe7cece..4ab09c0 100644 > --- a/block/linux-aio.c > +++ b/block/linux-aio.c > @@ -58,7 +58,7 @@ struct LinuxAioState { > > /* I/O completion processing */ > QEMUBH *completion_bh; > - struct io_event events[MAX_EVENTS]; > + struct io_event *events; > int event_idx; > int event_max; > }; > @@ -101,6 +101,60 @@ static void qemu_laio_process_completion(struct > qemu_laiocb *laiocb) > } > } > > +/** > + * aio_ring buffer which is shared between userspace and kernel. > + */ > +struct aio_ring { > + unsigned id; /* kernel internal index number */ > + unsigned nr; /* number of io_events */ > + unsigned head; /* Written to by userland or under ring_lock > + * mutex by aio_read_events_ring(). */ > + unsigned tail; > + > + unsigned magic; > + unsigned compat_features; > + unsigned incompat_features; > + unsigned header_length; /* size of aio_ring */ > + > + struct io_event io_events[0]; > +}; > + > +/** > + * io_getevents_peek() - returns the number of completed events > + * and sets a pointer on events array. > + * > + * This function does not update the internal ring buffer, only > + * reads head and tail. When @events has been processed > + * io_getevents_commit() must be called. > + */ > +static int io_getevents_peek(io_context_t aio_ctx, unsigned int max, > + struct io_event **events) > +{ > + struct aio_ring *ring = (struct aio_ring*)aio_ctx; > + unsigned head = ring->head, tail = ring->tail; > + unsigned nr; > + > + nr = tail >= head ? tail - head : ring->nr - head; > + *events = ring->io_events + head; > + /* To avoid speculative loads of s->events[i] before observing tail. > + Paired with smp_wmb() inside linux/fs/aio.c: aio_complete(). */ > + smp_rmb(); > + > + return nr; > +} > + > +/** > + * io_getevents_commit() - advances head of a ring buffer and returns > + * 1 if there are some events left in a ring. > + */ > +static int io_getevents_commit(io_context_t aio_ctx, unsigned int nr) > +{ > + struct aio_ring *ring = (struct aio_ring*)aio_ctx; > + > + ring->head = (ring->head + nr) % ring->nr; > + return (ring->head != ring->tail); > +} > + > /* The completion BH fetches completed I/O requests and invokes their > * callbacks. > * > @@ -118,32 +172,32 @@ static void qemu_laio_completion_bh(void *opaque) > > /* Fetch more completion events when empty */ > if (s->event_idx == s->event_max) { > - do { > - struct timespec ts = { 0 }; > - s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, > - s->events, &ts); > - } while (s->event_max == -EINTR); > - > +more: > + s->event_max = io_getevents_peek(s->ctx, MAX_EVENTS, &s->events); > s->event_idx = 0; > - if (s->event_max <= 0) { > - s->event_max = 0; > + if (s->event_max == 0) > return; /* no more events */ > - } > } > > /* Reschedule so nested event loops see currently pending completions */ > qemu_bh_schedule(s->completion_bh); > > /* Process completion events */ > - while (s->event_idx < s->event_max) { > - struct iocb *iocb = s->events[s->event_idx].obj; > - struct qemu_laiocb *laiocb = > + if (s->event_idx < s->event_max) { > + unsigned nr = s->event_max - s->event_idx; > + > + while (s->event_idx < s->event_max) { > + struct iocb *iocb = s->events[s->event_idx].obj; > + struct qemu_laiocb *laiocb = > container_of(iocb, struct qemu_laiocb, iocb); > > - laiocb->ret = io_event_ret(&s->events[s->event_idx]); > - s->event_idx++; > + laiocb->ret = io_event_ret(&s->events[s->event_idx]); > + s->event_idx++; > > - qemu_laio_process_completion(laiocb); > + qemu_laio_process_completion(laiocb); > + } > + if (io_getevents_commit(s->ctx, nr)) > + goto more; > } > > if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { >