On Sat, Jun 15, 2013 at 7:16 PM, Paolo Bonzini <pbonz...@redhat.com> wrote: > Il 15/06/2013 04:43, liu ping fan ha scritto: >> On Sat, Jun 15, 2013 at 1:35 PM, Paolo Bonzini <pbonz...@redhat.com> wrote: >>> Il 13/06/2013 22:55, Liu Ping Fan ha scritto: >>>> BH will be used outside big lock, so introduce lock to protect it. >>>> Note that the lock only affects the writer and bh's callback does >>>> not take this extra lock. >>>> >>>> Signed-off-by: Liu Ping Fan <pingf...@linux.vnet.ibm.com> >>>> --- >>>> async.c | 10 +++++++++- >>>> include/block/aio.h | 2 ++ >>>> 2 files changed, 11 insertions(+), 1 deletion(-) >>>> >>>> diff --git a/async.c b/async.c >>>> index 90fe906..b1dcead 100644 >>>> --- a/async.c >>>> +++ b/async.c >>>> @@ -47,8 +47,11 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, >>>> void *opaque) >>>> bh->ctx = ctx; >>>> bh->cb = cb; >>>> bh->opaque = opaque; >>>> + qemu_mutex_lock(&ctx->bh_lock); >>>> bh->next = ctx->first_bh; >>>> + smp_wmb(); >>>> ctx->first_bh = bh; >>>> + qemu_mutex_unlock(&ctx->bh_lock); >>>> return bh; >>>> } >>>> >>>> @@ -60,7 +63,9 @@ int aio_bh_poll(AioContext *ctx) >>>> ctx->walking_bh++; >>>> >>>> ret = 0; >>>> - for (bh = ctx->first_bh; bh; bh = next) { >>>> + bh = ctx->first_bh; >>>> + smp_rmb(); >>>> + for (; bh; bh = next) { >>>> next = bh->next; >>> >>> The read barrier in theory should go inside the loop. On the other >> >> For read after more than 1 writer? > > Yes. > >>> hand, it only needs to be a smp_read_barrier_depends(). >>> >> Ok, so it will depend on ops introduced in your urcu patch series. > > Well, it's just the Alpha that needs the barrier, so it could even be > commented for now. But... > >>>> if (!bh->deleted && bh->scheduled) { >>>> bh->scheduled = 0; >>>> @@ -75,6 +80,7 @@ int aio_bh_poll(AioContext *ctx) >>>> >>>> /* remove deleted bhs */ >>>> if (!ctx->walking_bh) { >>>> + qemu_mutex_lock(&ctx->bh_lock); > > ... I'm not sure that this works yet. I see two problems: > ctx->walking_bh needs to be accessed atomic, and while you are doing the > deletions somebody could come up and start a read. Havoc. > Sorry, could you elaborate on this? I guess you plan to have thread-pools to run all of the AioContexts, so the reader and deleter can occur in parallel, right?
Thanks and regards, Pingfan > It's not an easy problem, because even with RCU the bottom half callback > should run _outside_ the RCU critical section. I suggest that you hunt > the kernel for something that is trying to do the same. > > One idea could be to have a separate list for scheduled bottom halves. > You can walk this list destructively, so you have no problem releasing > the RCU critical section while invoking the callbacks. Again, the > kernel or urcu data structure library can help (urcu has a RCU-based FIFO). > > You can track if there are any deleted BHs, and if so do a call_rcu at > the end of aio_bh_poll. You do not delete scheduled BHs, instead let > the next aio_bh_poll remove them from the scheduled list and do another > call_rcu. > > Thanks for doing this work, we need "textbook examples" of how to do > this, that we can perhaps later apply to timers and all that. > > Paolo > >>>> bhp = &ctx->first_bh; >>>> while (*bhp) { >>>> bh = *bhp; >>>> @@ -85,6 +91,7 @@ int aio_bh_poll(AioContext *ctx) >>>> bhp = &bh->next; >>>> } >>>> } >>>> + qemu_mutex_unlock(&ctx->bh_lock); >>>> } >>>> >>>> return ret; >>>> @@ -211,6 +218,7 @@ AioContext *aio_context_new(void) >>>> ctx = (AioContext *) g_source_new(&aio_source_funcs, >>>> sizeof(AioContext)); >>>> ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD)); >>>> ctx->thread_pool = NULL; >>>> + qemu_mutex_init(&ctx->bh_lock); >>>> event_notifier_init(&ctx->notifier, false); >>>> aio_set_event_notifier(ctx, &ctx->notifier, >>>> (EventNotifierHandler *) >>>> diff --git a/include/block/aio.h b/include/block/aio.h >>>> index 1836793..a65d16f 100644 >>>> --- a/include/block/aio.h >>>> +++ b/include/block/aio.h >>>> @@ -17,6 +17,7 @@ >>>> #include "qemu-common.h" >>>> #include "qemu/queue.h" >>>> #include "qemu/event_notifier.h" >>>> +#include "qemu/thread.h" >>>> >>>> typedef struct BlockDriverAIOCB BlockDriverAIOCB; >>>> typedef void BlockDriverCompletionFunc(void *opaque, int ret); >>>> @@ -53,6 +54,7 @@ typedef struct AioContext { >>>> */ >>>> int walking_handlers; >>>> >>>> + QemuMutex bh_lock; >>>> /* Anchor of the list of Bottom Halves belonging to the context */ >>>> struct QEMUBH *first_bh; >>>> >>>> >>> >> >> >