This patch replace the global coroutine queue with a lock-free stack of which the elements are coroutine queues. Threads can put coroutine queues into the stack or take queues from it and each coroutine queue has exactly POOL_BATCH_SIZE coroutines. Note that the stack is not strictly LIFO, but it's enough for buffer pool.
Coroutines will be put into thread-local pools first while release. Now the fast pathes of both allocation and release are atomic-free, and there won't be too many coroutines remain in a single thread since POOL_BATCH_SIZE has been reduced to 16. In practice, I've run a VM with two block devices binding to two different iothreads, and run fio with iodepth 128 on each device. It maintains around 400 coroutines and has about 1% chance of calling to `qemu_coroutine_new` without this patch. And with this patch, it maintains no more than 273 coroutines and doesn't call `qemu_coroutine_new` after initial allocations. Signed-off-by: wanghonghao <wanghong...@bytedance.com> --- util/qemu-coroutine.c | 63 ++++++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 21 deletions(-) diff --git a/util/qemu-coroutine.c b/util/qemu-coroutine.c index c3caa6c770..9202ec9c85 100644 --- a/util/qemu-coroutine.c +++ b/util/qemu-coroutine.c @@ -21,13 +21,14 @@ #include "block/aio.h" enum { - POOL_BATCH_SIZE = 64, + POOL_BATCH_SIZE = 16, + POOL_MAX_BATCHES = 32, }; -/** Free list to speed up creation */ -static QSLIST_HEAD(, Coroutine) release_pool = QSLIST_HEAD_INITIALIZER(pool); -static unsigned int release_pool_size; -static __thread QSLIST_HEAD(, Coroutine) alloc_pool = QSLIST_HEAD_INITIALIZER(pool); +/** Free stack to speed up creation */ +static QSLIST_HEAD(, Coroutine) pool[POOL_MAX_BATCHES]; +static int pool_top; +static __thread QSLIST_HEAD(, Coroutine) alloc_pool; static __thread unsigned int alloc_pool_size; static __thread Notifier coroutine_pool_cleanup_notifier; @@ -49,20 +50,26 @@ Coroutine *qemu_coroutine_create(CoroutineEntry *entry, void *opaque) if (CONFIG_COROUTINE_POOL) { co = QSLIST_FIRST(&alloc_pool); if (!co) { - if (release_pool_size > POOL_BATCH_SIZE) { - /* Slow path; a good place to register the destructor, too. */ - if (!coroutine_pool_cleanup_notifier.notify) { - coroutine_pool_cleanup_notifier.notify = coroutine_pool_cleanup; - qemu_thread_atexit_add(&coroutine_pool_cleanup_notifier); + int top; + + /* Slow path; a good place to register the destructor, too. */ + if (!coroutine_pool_cleanup_notifier.notify) { + coroutine_pool_cleanup_notifier.notify = coroutine_pool_cleanup; + qemu_thread_atexit_add(&coroutine_pool_cleanup_notifier); + } + + while ((top = atomic_read(&pool_top)) > 0) { + if (atomic_cmpxchg(&pool_top, top, top - 1) != top) { + continue; } - /* This is not exact; there could be a little skew between - * release_pool_size and the actual size of release_pool. But - * it is just a heuristic, it does not need to be perfect. - */ - alloc_pool_size = atomic_xchg(&release_pool_size, 0); - QSLIST_MOVE_ATOMIC(&alloc_pool, &release_pool); + QSLIST_MOVE_ATOMIC(&alloc_pool, &pool[top - 1]); co = QSLIST_FIRST(&alloc_pool); + + if (co) { + alloc_pool_size = POOL_BATCH_SIZE; + break; + } } } if (co) { @@ -86,16 +93,30 @@ static void coroutine_delete(Coroutine *co) co->caller = NULL; if (CONFIG_COROUTINE_POOL) { - if (release_pool_size < POOL_BATCH_SIZE * 2) { - QSLIST_INSERT_HEAD_ATOMIC(&release_pool, co, pool_next); - atomic_inc(&release_pool_size); - return; - } + int top, value, old; + if (alloc_pool_size < POOL_BATCH_SIZE) { QSLIST_INSERT_HEAD(&alloc_pool, co, pool_next); alloc_pool_size++; return; } + + for (top = atomic_read(&pool_top); top < POOL_MAX_BATCHES; top++) { + QSLIST_REPLACE_ATOMIC(&pool[top], &alloc_pool, &alloc_pool); + if (!QSLIST_EMPTY(&alloc_pool)) { + continue; + } + + value = top + 1; + + do { + old = atomic_cmpxchg(&pool_top, top, value); + } while (old != top && (top = old) < value); + + QSLIST_INSERT_HEAD(&alloc_pool, co, pool_next); + alloc_pool_size = 1; + return; + } } qemu_coroutine_delete(co); -- 2.24.3 (Apple Git-128)