Divide the fields in AioTaskPool in IN and Status, and introduce a CoQueue instead of .wait to take care of suspending and resuming the calling coroutine, and a lock to protect the busy_tasks counter accesses and the AioTask .ret field.
Signed-off-by: Emanuele Giuseppe Esposito <[email protected]> --- block/aio_task.c | 63 ++++++++++++++++++++++++---------------- include/block/aio_task.h | 2 +- 2 files changed, 39 insertions(+), 26 deletions(-) diff --git a/block/aio_task.c b/block/aio_task.c index 88989fa248..7ac6b5dd72 100644 --- a/block/aio_task.c +++ b/block/aio_task.c @@ -27,62 +27,70 @@ #include "block/aio_task.h" struct AioTaskPool { - Coroutine *main_co; - int status; + /* IN: just set in aio_task_pool_new and never modified */ int max_busy_tasks; + + /* Status: either atomic or protected by the lock */ + int status; int busy_tasks; - bool waiting; + CoQueue queue; + CoMutex lock; }; static void coroutine_fn aio_task_co(void *opaque) { + int ret; AioTask *task = opaque; AioTaskPool *pool = task->pool; - assert(pool->busy_tasks < pool->max_busy_tasks); - pool->busy_tasks++; + WITH_QEMU_LOCK_GUARD(&pool->lock) { + assert(pool->busy_tasks < pool->max_busy_tasks); + pool->busy_tasks++; - task->ret = task->func(task); + ret = task->func(task); + task->ret = ret; - pool->busy_tasks--; + pool->busy_tasks--; + } - if (task->ret < 0 && pool->status == 0) { - pool->status = task->ret; + if (ret < 0) { + qatomic_cmpxchg(&pool->status, 0, ret); } g_free(task); - if (pool->waiting) { - pool->waiting = false; - aio_co_wake(pool->main_co); - } + qemu_co_queue_next(&pool->queue); } -void coroutine_fn aio_task_pool_wait_one(AioTaskPool *pool) +/* Called with lock held */ +static void coroutine_fn aio_task_pool_wait_one_unlocked(AioTaskPool *pool) { assert(pool->busy_tasks > 0); - assert(qemu_coroutine_self() == pool->main_co); - - pool->waiting = true; - qemu_coroutine_yield(); - - assert(!pool->waiting); + qemu_co_queue_wait(&pool->queue, &pool->lock); assert(pool->busy_tasks < pool->max_busy_tasks); } +void coroutine_fn aio_task_pool_wait_one(AioTaskPool *pool) +{ + QEMU_LOCK_GUARD(&pool->lock); + aio_task_pool_wait_one_unlocked(pool); +} + void coroutine_fn aio_task_pool_wait_slot(AioTaskPool *pool) { + QEMU_LOCK_GUARD(&pool->lock); if (pool->busy_tasks < pool->max_busy_tasks) { return; } - aio_task_pool_wait_one(pool); + aio_task_pool_wait_one_unlocked(pool); } void coroutine_fn aio_task_pool_wait_all(AioTaskPool *pool) { + QEMU_LOCK_GUARD(&pool->lock); while (pool->busy_tasks > 0) { - aio_task_pool_wait_one(pool); + aio_task_pool_wait_one_unlocked(pool); } } @@ -98,8 +106,8 @@ AioTaskPool *coroutine_fn aio_task_pool_new(int max_busy_tasks) { AioTaskPool *pool = g_new0(AioTaskPool, 1); - pool->main_co = qemu_coroutine_self(); pool->max_busy_tasks = max_busy_tasks; + qemu_co_queue_init(&pool->queue); return pool; } @@ -115,10 +123,15 @@ int aio_task_pool_status(AioTaskPool *pool) return 0; /* Sugar for lazy allocation of aio pool */ } - return pool->status; + return qatomic_read(&pool->status); } bool aio_task_pool_empty(AioTaskPool *pool) { - return pool->busy_tasks == 0; + int tasks; + + qemu_co_mutex_lock(&pool->lock); + tasks = pool->busy_tasks; + qemu_co_mutex_unlock(&pool->lock); + return tasks == 0; } diff --git a/include/block/aio_task.h b/include/block/aio_task.h index 50bc1e1817..b22a4310aa 100644 --- a/include/block/aio_task.h +++ b/include/block/aio_task.h @@ -33,7 +33,7 @@ typedef int coroutine_fn (*AioTaskFunc)(AioTask *task); struct AioTask { AioTaskPool *pool; AioTaskFunc func; - int ret; + int ret; /* atomic */ }; AioTaskPool *coroutine_fn aio_task_pool_new(int max_busy_tasks); -- 2.30.2
