Il 06/03/2013 15:53, Stefan Hajnoczi ha scritto: > CoQueue uses a BH to awake coroutines that were made ready to run again > using qemu_co_queue_next() or qemu_co_queue_restart_all(). The BH > currently runs in the iothread AioContext and would break coroutines > that run in a different AioContext. > > This is a slightly tricky problem because the lifetime of the BH exceeds > that of the CoQueue. This means coroutines can be awoken after CoQueue > itself has been freed. Also, there is no qemu_co_queue_destroy() > function which we could use to handle freeing resources. > > Introducing qemu_co_queue_destroy() has a ripple effect of requiring us > to also add qemu_co_mutex_destroy() and qemu_co_rwlock_destroy(), as > well as updating all callers. Avoid doing that. > > We also cannot switch from BH to GIdle function because aio_poll() does > not dispatch GIdle functions. (GIdle functions make memory management > slightly easier because they free themselves.) > > Finally, I don't want to move unlock_queue and unlock_bh into > AioContext. That would break encapsulation - AioContext isn't supposed > to know about CoQueue. > > This patch implements a different solution: each qemu_co_queue_next() or > qemu_co_queue_restart_all() call creates a new BH and list of coroutines > to wake up. Callers tend to invoke qemu_co_queue_next() and > qemu_co_queue_restart_all() occasionally after blocking I/O, so creating > a new BH for each call shouldn't be massively inefficient. > > Note that this patch does not add an interface for specifying the > AioContext. That is left to future patches which will convert CoQueue, > CoMutex, and CoRwlock to expose AioContext. > > Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com> > --- > include/block/coroutine.h | 1 + > qemu-coroutine-lock.c | 59 > ++++++++++++++++++++++++++++++++--------------- > 2 files changed, 42 insertions(+), 18 deletions(-) > > diff --git a/include/block/coroutine.h b/include/block/coroutine.h > index c31fae3..a978162 100644 > --- a/include/block/coroutine.h > +++ b/include/block/coroutine.h > @@ -104,6 +104,7 @@ bool qemu_in_coroutine(void); > */ > typedef struct CoQueue { > QTAILQ_HEAD(, Coroutine) entries; > + AioContext *ctx; > } CoQueue; > > /** > diff --git a/qemu-coroutine-lock.c b/qemu-coroutine-lock.c > index 97ef01c..ae986b3 100644 > --- a/qemu-coroutine-lock.c > +++ b/qemu-coroutine-lock.c > @@ -29,28 +29,34 @@ > #include "block/aio.h" > #include "trace.h" > > -static QTAILQ_HEAD(, Coroutine) unlock_bh_queue = > - QTAILQ_HEAD_INITIALIZER(unlock_bh_queue); > -static QEMUBH* unlock_bh; > +/* Coroutines are awoken from a BH to allow the current coroutine to complete > + * its flow of execution. The BH may run after the CoQueue has been > destroyed, > + * so keep BH data in a separate heap-allocated struct. > + */ > +typedef struct { > + QEMUBH *bh; > + QTAILQ_HEAD(, Coroutine) entries; > +} CoQueueNextData; > > static void qemu_co_queue_next_bh(void *opaque) > { > + CoQueueNextData *data = opaque; > Coroutine *next; > > trace_qemu_co_queue_next_bh(); > - while ((next = QTAILQ_FIRST(&unlock_bh_queue))) { > - QTAILQ_REMOVE(&unlock_bh_queue, next, co_queue_next); > + while ((next = QTAILQ_FIRST(&data->entries))) { > + QTAILQ_REMOVE(&data->entries, next, co_queue_next); > qemu_coroutine_enter(next, NULL); > } > + > + qemu_bh_delete(data->bh); > + g_slice_free(CoQueueNextData, data); > } > > void qemu_co_queue_init(CoQueue *queue) > { > QTAILQ_INIT(&queue->entries); > - > - if (!unlock_bh) { > - unlock_bh = qemu_bh_new(qemu_co_queue_next_bh, NULL); > - } > + queue->ctx = NULL;
What about adding an accessor for qemu_aio_context and using it? Then you can just use aio_bh_new in qemu_co_queue_do_restart. Paolo > } > > void coroutine_fn qemu_co_queue_wait(CoQueue *queue) > @@ -69,26 +75,43 @@ void coroutine_fn qemu_co_queue_wait_insert_head(CoQueue > *queue) > assert(qemu_in_coroutine()); > } > > -bool qemu_co_queue_next(CoQueue *queue) > +static bool qemu_co_queue_do_restart(CoQueue *queue, bool single) > { > Coroutine *next; > + CoQueueNextData *data; > + > + if (QTAILQ_EMPTY(&queue->entries)) { > + return false; > + } > > - next = QTAILQ_FIRST(&queue->entries); > - if (next) { > + data = g_slice_new(CoQueueNextData); > + if (queue->ctx) { > + data->bh = aio_bh_new(queue->ctx, qemu_co_queue_next_bh, data); > + } else { > + data->bh = qemu_bh_new(qemu_co_queue_next_bh, data); > + } > + QTAILQ_INIT(&data->entries); > + qemu_bh_schedule(data->bh); > + > + while ((next = QTAILQ_FIRST(&queue->entries)) != NULL) { > QTAILQ_REMOVE(&queue->entries, next, co_queue_next); > - QTAILQ_INSERT_TAIL(&unlock_bh_queue, next, co_queue_next); > + QTAILQ_INSERT_TAIL(&data->entries, next, co_queue_next); > trace_qemu_co_queue_next(next); > - qemu_bh_schedule(unlock_bh); > + if (single) { > + break; > + } > } > + return true; > +} > > - return (next != NULL); > +bool qemu_co_queue_next(CoQueue *queue) > +{ > + return qemu_co_queue_do_restart(queue, true); > } > > void qemu_co_queue_restart_all(CoQueue *queue) > { > - while (qemu_co_queue_next(queue)) { > - /* Do nothing */ > - } > + qemu_co_queue_do_restart(queue, false); > } > > bool qemu_co_queue_empty(CoQueue *queue) >