On Sun, Jun 16, 2013 at 07:21:21PM +0800, Liu Ping Fan wrote: > BH will be used outside big lock, so introduce lock to protect > between the writers, ie, bh's adders and deleter. > Note that the lock only affects the writers and bh's callback does > not take this extra lock. > > Signed-off-by: Liu Ping Fan <pingf...@linux.vnet.ibm.com> > --- > async.c | 21 +++++++++++++++++++++ > include/block/aio.h | 3 +++ > 2 files changed, 24 insertions(+) > > diff --git a/async.c b/async.c > index 90fe906..6a3269f 100644 > --- a/async.c > +++ b/async.c > @@ -47,8 +47,12 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void > *opaque) > bh->ctx = ctx; > bh->cb = cb; > bh->opaque = opaque; > + qemu_mutex_lock(&ctx->bh_lock); > bh->next = ctx->first_bh; > + /* Make sure the memebers ready before putting bh into list */ > + smp_wmb(); > ctx->first_bh = bh; > + qemu_mutex_unlock(&ctx->bh_lock); > return bh; > } > > @@ -61,12 +65,18 @@ int aio_bh_poll(AioContext *ctx) > > ret = 0; > for (bh = ctx->first_bh; bh; bh = next) { > + /* Make sure fetching bh before accessing its members */ > + smp_read_barrier_depends(); > next = bh->next; > if (!bh->deleted && bh->scheduled) { > bh->scheduled = 0; > if (!bh->idle) > ret = 1; > bh->idle = 0; > + /* Paired with write barrier in bh schedule to ensure reading for > + * callbacks coming after bh's scheduling. > + */ > + smp_rmb(); > bh->cb(bh->opaque);
Could we possibly simplify this by introducing a recursive mutex that we could use to protect the whole list loop and hold even during the cb? I assume we can't hold the lock during the cb currently since we might try to reschedule, but if it's a recursive mutex would that simplify things? I've been doing something similar with IOHandlers for the QContext stuff, and that's the approach I took. This patch introduces the recursive mutex: https://github.com/mdroth/qemu/commit/c7ee0844da62283c9466fcb10ddbfadd0b8bfc53 > } > } > @@ -75,6 +85,7 @@ int aio_bh_poll(AioContext *ctx) > > /* remove deleted bhs */ > if (!ctx->walking_bh) { > + qemu_mutex_lock(&ctx->bh_lock); > bhp = &ctx->first_bh; > while (*bhp) { > bh = *bhp; > @@ -85,6 +96,7 @@ int aio_bh_poll(AioContext *ctx) > bhp = &bh->next; > } > } > + qemu_mutex_unlock(&ctx->bh_lock); > } > > return ret; > @@ -94,6 +106,10 @@ void qemu_bh_schedule_idle(QEMUBH *bh) > { > if (bh->scheduled) > return; > + /* Make sure any writes that are needed by the callback are done > + * before the locations are read in the aio_bh_poll. > + */ > + smp_wmb(); > bh->scheduled = 1; > bh->idle = 1; > } > @@ -102,6 +118,10 @@ void qemu_bh_schedule(QEMUBH *bh) > { > if (bh->scheduled) > return; > + /* Make sure any writes that are needed by the callback are done > + * before the locations are read in the aio_bh_poll. > + */ > + smp_wmb(); > bh->scheduled = 1; > bh->idle = 0; > aio_notify(bh->ctx); > @@ -211,6 +231,7 @@ AioContext *aio_context_new(void) > ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext)); > ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD)); > ctx->thread_pool = NULL; > + qemu_mutex_init(&ctx->bh_lock); > event_notifier_init(&ctx->notifier, false); > aio_set_event_notifier(ctx, &ctx->notifier, > (EventNotifierHandler *) > diff --git a/include/block/aio.h b/include/block/aio.h > index 1836793..971fbef 100644 > --- a/include/block/aio.h > +++ b/include/block/aio.h > @@ -17,6 +17,7 @@ > #include "qemu-common.h" > #include "qemu/queue.h" > #include "qemu/event_notifier.h" > +#include "qemu/thread.h" > > typedef struct BlockDriverAIOCB BlockDriverAIOCB; > typedef void BlockDriverCompletionFunc(void *opaque, int ret); > @@ -53,6 +54,8 @@ typedef struct AioContext { > */ > int walking_handlers; > > + /* lock to protect between bh's adders and deleter */ > + QemuMutex bh_lock; > /* Anchor of the list of Bottom Halves belonging to the context */ > struct QEMUBH *first_bh; > > -- > 1.8.1.4 > >