Il 27/08/2013 16:39, Stefan Hajnoczi ha scritto: > It can be useful to run an AioContext from a thread which normally does > not "own" the AioContext. For example, request draining can be > implemented by acquiring the AioContext and looping aio_poll() until all > requests have been completed. > > The following pattern should work: > > /* Event loop thread */ > while (running) { > aio_context_acquire(ctx); > aio_poll(ctx, true); > aio_context_release(ctx); > } > > /* Another thread */ > aio_context_acquire(ctx); > bdrv_read(bs, 0x1000, buf, 1); > aio_context_release(ctx); > > This patch implements aio_context_acquire() and aio_context_release(). > Note that existing aio_poll() callers do not need to worry about > acquiring and releasing - it is only needed when multiple threads will > call aio_poll() on the same AioContext. > > Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com>
Really, really nice! The "kick owner thread" technique is a very interesting way to avoid dropping the lock around aio_poll's ppoll system call. On top of this, I think it would be useful to make aio_context_acquire support recursive acquisition by returning a bool if the current thread is already the owner. Recursive acquisition != recursive locking! :) In fact, acquisition and releasing could be done directly by the synchronous block I/O functions perhaps? One comment: ctx->owner is really "ctx->owned", if you replace the argument of qemu_thread_is_self(ctx->owner) with &ctx->owner_thread. It is probably a bit clearer that way. Paolo > --- > I previously sent patches that implement bdrv_drain_all() by stopping > dataplane > threads. AioContext acquire()/release() is a more general solution than > temporarily stopping dataplane threads. This solution is less hacky and also > supported by other event loops like GMainContext. > > No need to commit this patch yet, I still want to build things on top of it > before submitting a final version. > > async.c | 27 +++++++++++++++++++++++++ > include/block/aio.h | 13 ++++++++++++ > tests/test-aio.c | 58 > +++++++++++++++++++++++++++++++++++++++++++++++++++++ > 3 files changed, 98 insertions(+) > > diff --git a/include/block/aio.h b/include/block/aio.h > index 5743bf1..9035e87 100644 > --- a/include/block/aio.h > +++ b/include/block/aio.h > @@ -45,6 +45,11 @@ typedef void IOHandler(void *opaque); > typedef struct AioContext { > GSource source; > > + QemuMutex acquire_lock; > + QemuCond acquire_cond; > + QemuThread owner_thread; > + QemuThread *owner; > + > /* The list of registered AIO handlers */ > QLIST_HEAD(, AioHandler) aio_handlers; > > @@ -99,6 +104,14 @@ void aio_context_ref(AioContext *ctx); > */ > void aio_context_unref(AioContext *ctx); > > +/* Take ownership of the AioContext. If the AioContext will be shared > between > + * threads, a thread must have ownership when calling aio_poll(). > + */ > +void aio_context_acquire(AioContext *ctx); > + > +/* Reliquinish ownership of the AioContext. */ > +void aio_context_release(AioContext *ctx); > + > /** > * aio_bh_new: Allocate a new bottom half structure. > * > diff --git a/async.c b/async.c > index 9791d8e..9fec07c 100644 > --- a/async.c > +++ b/async.c > @@ -203,6 +203,8 @@ aio_ctx_finalize(GSource *source) > thread_pool_free(ctx->thread_pool); > aio_set_event_notifier(ctx, &ctx->notifier, NULL); > event_notifier_cleanup(&ctx->notifier); > + qemu_cond_destroy(&ctx->acquire_cond); > + qemu_mutex_destroy(&ctx->acquire_lock); > qemu_mutex_destroy(&ctx->bh_lock); > g_array_free(ctx->pollfds, TRUE); > } > @@ -240,6 +242,9 @@ AioContext *aio_context_new(void) > ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD)); > ctx->thread_pool = NULL; > qemu_mutex_init(&ctx->bh_lock); > + qemu_mutex_init(&ctx->acquire_lock); > + qemu_cond_init(&ctx->acquire_cond); > + ctx->owner = NULL; > event_notifier_init(&ctx->notifier, false); > aio_set_event_notifier(ctx, &ctx->notifier, > (EventNotifierHandler *) > @@ -257,3 +262,25 @@ void aio_context_unref(AioContext *ctx) > { > g_source_unref(&ctx->source); > } > + > +void aio_context_acquire(AioContext *ctx) > +{ > + qemu_mutex_lock(&ctx->acquire_lock); > + while (ctx->owner) { > + assert(!qemu_thread_is_self(ctx->owner)); > + aio_notify(ctx); /* kick current owner */ > + qemu_cond_wait(&ctx->acquire_cond, &ctx->acquire_lock); > + } > + qemu_thread_get_self(&ctx->owner_thread); > + ctx->owner = &ctx->owner_thread; > + qemu_mutex_unlock(&ctx->acquire_lock); > +} > + > +void aio_context_release(AioContext *ctx) > +{ > + qemu_mutex_lock(&ctx->acquire_lock); > + assert(ctx->owner && qemu_thread_is_self(ctx->owner)); > + ctx->owner = NULL; > + qemu_cond_signal(&ctx->acquire_cond); > + qemu_mutex_unlock(&ctx->acquire_lock); > +} > diff --git a/tests/test-aio.c b/tests/test-aio.c > index 1ab5637..324c099 100644 > --- a/tests/test-aio.c > +++ b/tests/test-aio.c > @@ -88,6 +88,63 @@ static void test_notify(void) > g_assert(!aio_poll(ctx, false)); > } > > +typedef struct { > + QemuMutex start_lock; > + bool thread_acquired; > +} AcquireTestData; > + > +static void *test_acquire_thread(void *opaque) > +{ > + AcquireTestData *data = opaque; > + > + /* Wait for other thread to let us start */ > + qemu_mutex_lock(&data->start_lock); > + qemu_mutex_unlock(&data->start_lock); > + > + aio_context_acquire(ctx); > + aio_context_release(ctx); > + > + data->thread_acquired = true; /* success, we got here */ > + > + return NULL; > +} > + > +static void dummy_notifier_read(EventNotifier *unused) > +{ > + g_assert(false); /* should never be invoked */ > +} > + > +static void test_acquire(void) > +{ > + QemuThread thread; > + EventNotifier notifier; > + AcquireTestData data; > + > + /* Dummy event notifier ensures aio_poll() will block */ > + event_notifier_init(¬ifier, false); > + aio_set_event_notifier(ctx, ¬ifier, dummy_notifier_read); > + g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */ > + > + qemu_mutex_init(&data.start_lock); > + qemu_mutex_lock(&data.start_lock); > + data.thread_acquired = false; > + > + qemu_thread_create(&thread, test_acquire_thread, > + &data, QEMU_THREAD_JOINABLE); > + > + /* Block in aio_poll(), let other thread kick us and acquire context */ > + aio_context_acquire(ctx); > + qemu_mutex_unlock(&data.start_lock); /* let the thread run */ > + g_assert(!aio_poll(ctx, true)); > + aio_context_release(ctx); > + > + qemu_thread_join(&thread); > + aio_set_event_notifier(ctx, ¬ifier, NULL); > + event_notifier_cleanup(¬ifier); > + > + g_assert(data.thread_acquired); > +} > + > static void test_bh_schedule(void) > { > BHTestData data = { .n = 0 }; > @@ -639,6 +696,7 @@ int main(int argc, char **argv) > > g_test_init(&argc, &argv, NULL); > g_test_add_func("/aio/notify", test_notify); > + g_test_add_func("/aio/acquire", test_acquire); > g_test_add_func("/aio/bh/schedule", test_bh_schedule); > g_test_add_func("/aio/bh/schedule10", test_bh_schedule10); > g_test_add_func("/aio/bh/cancel", test_bh_cancel); >