Since commit f9fc8932b1 ("thread-posix: remove the posix semaphore support", 2022-04-06) QemuSemaphore has its own mutex and condition variable; this adds unnecessary overhead on I/O with small block sizes.
Check the QTAILQ directly instead of adding the indirection of a semaphore's count. Using a semaphore has not been necessary since qemu_cond_timedwait was introduced; the new code has to be careful about spurious wakeups but it is simpler, for example thread_pool_cancel does not have to worry about synchronizing the semaphore count with the number of elements of pool->request_list. Note that the return value of qemu_cond_timedwait (0 for timeout, 1 for signal or spurious wakeup) is different from that of qemu_sem_timedwait (-1 for timeout, 0 for success). Reported-by: Lukáš Doktor <ldok...@redhat.com> Suggested-by: Stefan Hajnoczi <stefa...@redhat.com> Signed-off-by: Paolo Bonzini <pbonz...@redhat.com> --- util/thread-pool.c | 30 +++++++++++------------------- 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/util/thread-pool.c b/util/thread-pool.c index d763cea505..ccdfc82d71 100644 --- a/util/thread-pool.c +++ b/util/thread-pool.c @@ -57,7 +57,7 @@ struct ThreadPool { QEMUBH *completion_bh; QemuMutex lock; QemuCond worker_stopped; - QemuSemaphore sem; + QemuCond request_cond; int max_threads; QEMUBH *new_thread_bh; @@ -85,15 +85,14 @@ static void *worker_thread(void *opaque) ThreadPoolElement *req; int ret; - do { + if (QTAILQ_EMPTY(&pool->request_list)) { pool->idle_threads++; - qemu_mutex_unlock(&pool->lock); - ret = qemu_sem_timedwait(&pool->sem, 10000); - qemu_mutex_lock(&pool->lock); + ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000); pool->idle_threads--; - } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list)); - if (ret == -1 || pool->stopping) { - break; + if (!ret && QTAILQ_EMPTY(&pool->request_list)) { + break; + } + continue; } req = QTAILQ_FIRST(&pool->request_list); @@ -211,13 +210,7 @@ static void thread_pool_cancel(BlockAIOCB *acb) trace_thread_pool_cancel(elem, elem->common.opaque); QEMU_LOCK_GUARD(&pool->lock); - if (elem->state == THREAD_QUEUED && - /* No thread has yet started working on elem. we can try to "steal" - * the item from the worker if we can get a signal from the - * semaphore. Because this is non-blocking, we can do it with - * the lock taken and ensure that elem will remain THREAD_QUEUED. - */ - qemu_sem_timedwait(&pool->sem, 0) == 0) { + if (elem->state == THREAD_QUEUED) { QTAILQ_REMOVE(&pool->request_list, elem, reqs); qemu_bh_schedule(pool->completion_bh); @@ -261,8 +254,8 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, spawn_thread(pool); } QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); + qemu_cond_signal(&pool->request_cond); qemu_mutex_unlock(&pool->lock); - qemu_sem_post(&pool->sem); return &req->common; } @@ -305,7 +298,7 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool); qemu_mutex_init(&pool->lock); qemu_cond_init(&pool->worker_stopped); - qemu_sem_init(&pool->sem, 0); + qemu_cond_init(&pool->request_cond); pool->max_threads = 64; pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); @@ -337,15 +330,14 @@ void thread_pool_free(ThreadPool *pool) /* Wait for worker threads to terminate */ pool->stopping = true; + qemu_cond_broadcast(&pool->request_cond); while (pool->cur_threads > 0) { - qemu_sem_post(&pool->sem); qemu_cond_wait(&pool->worker_stopped, &pool->lock); } qemu_mutex_unlock(&pool->lock); qemu_bh_delete(pool->completion_bh); - qemu_sem_destroy(&pool->sem); qemu_cond_destroy(&pool->worker_stopped); qemu_mutex_destroy(&pool->lock); g_free(pool); -- 2.35.1