"Maciej S. Szmigiero" <m...@maciej.szmigiero.name> writes: > On 3.09.2024 00:07, Fabiano Rosas wrote: >> "Maciej S. Szmigiero" <m...@maciej.szmigiero.name> writes: >> >>> From: "Maciej S. Szmigiero" <maciej.szmigi...@oracle.com> >>> >>> Migration code wants to manage device data sending threads in one place. >>> >>> QEMU has an existing thread pool implementation, however it was limited >>> to queuing AIO operations only and essentially had a 1:1 mapping between >>> the current AioContext and the ThreadPool in use. >>> >>> Implement what is necessary to queue generic (non-AIO) work on a ThreadPool >>> too. >>> >>> This brings a few new operations on a pool: >>> * thread_pool_set_minmax_threads() explicitly sets the minimum and maximum >>> thread count in the pool. >>> >>> * thread_pool_join() operation waits until all the submitted work requests >>> have finished. >>> >>> * thread_pool_poll() lets the new thread and / or thread completion bottom >>> halves run (if they are indeed scheduled to be run). >>> It is useful for thread pool users that need to launch or terminate new >>> threads without returning to the QEMU main loop. >>> >>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigi...@oracle.com> >>> --- >>> include/block/thread-pool.h | 10 ++++- >>> tests/unit/test-thread-pool.c | 2 +- >>> util/thread-pool.c | 77 ++++++++++++++++++++++++++++++----- >>> 3 files changed, 76 insertions(+), 13 deletions(-) >>> >>> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h >>> index b484c4780ea6..1769496056cd 100644 >>> --- a/include/block/thread-pool.h >>> +++ b/include/block/thread-pool.h >>> @@ -37,9 +37,15 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, >>> void *arg, GDestroyNotify arg_destroy, >>> BlockCompletionFunc *cb, void *opaque); >>> int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg); >>> -void thread_pool_submit(ThreadPoolFunc *func, >>> - void *arg, GDestroyNotify arg_destroy); >>> +BlockAIOCB *thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, >>> + void *arg, GDestroyNotify arg_destroy, >>> + BlockCompletionFunc *cb, void *opaque); >> >> These kinds of changes (create wrappers, change signatures, etc), could >> be in their own patch as it's just code motion that should not have >> functional impact. The "no_requests" stuff would be better discussed in >> a separate patch. > > These changes *all* should have no functional impact on existing callers. > > But I get your overall point, will try to separate these really trivial > parts.
Yeah, I guess I meant that one set of changes has a larger potential for introducing a bug while the other is clearly harmless. > >>> >>> +void thread_pool_join(ThreadPool *pool); >>> +void thread_pool_poll(ThreadPool *pool); >>> + >>> +void thread_pool_set_minmax_threads(ThreadPool *pool, >>> + int min_threads, int max_threads); >>> void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx); >>> >>> #endif >>> diff --git a/tests/unit/test-thread-pool.c b/tests/unit/test-thread-pool.c >>> index e4afb9e36292..469c0f7057b6 100644 >>> --- a/tests/unit/test-thread-pool.c >>> +++ b/tests/unit/test-thread-pool.c >>> @@ -46,7 +46,7 @@ static void done_cb(void *opaque, int ret) >>> static void test_submit(void) >>> { >>> WorkerTestData data = { .n = 0 }; >>> - thread_pool_submit(worker_cb, &data, NULL); >>> + thread_pool_submit(NULL, worker_cb, &data, NULL, NULL, NULL); >>> while (data.n == 0) { >>> aio_poll(ctx, true); >>> } >>> diff --git a/util/thread-pool.c b/util/thread-pool.c >>> index 69a87ee79252..2bf3be875a51 100644 >>> --- a/util/thread-pool.c >>> +++ b/util/thread-pool.c >>> @@ -60,6 +60,7 @@ struct ThreadPool { >>> QemuMutex lock; >>> QemuCond worker_stopped; >>> QemuCond request_cond; >>> + QemuCond no_requests_cond; >>> QEMUBH *new_thread_bh; >>> >>> /* The following variables are only accessed from one AioContext. */ >>> @@ -73,6 +74,7 @@ struct ThreadPool { >>> int pending_threads; /* threads created but not running yet */ >>> int min_threads; >>> int max_threads; >>> + size_t requests_executing; >> >> What's with size_t? Should this be a uint32_t instead? > > Sizes of objects are normally size_t, since otherwise bad > things happen if objects are bigger than 4 GiB. Ok, but requests_executing is not the size of an object. It's the number of objects in a linked list that satisfy a certain predicate. There are no address space size considerations here. > > Considering that the minimum object size is 1 byte the > max count of distinct objects also needs a size_t to not > risk an overflow. I'm not sure I get you, there's no overflow since you're bounds checking with the assert. Or is this a more abstract line of thought about how many ThreadPoolElements can be present in memory at a time and you'd like a type that's certain to fit the theoretical amount of objects? > > I think that while 2^32 requests executing seems unlikely > saving 4 bytes seems not worth worrying that someone will > find a vulnerability triggered by overflowing a 32-bit > variable (not necessary in the migration code but in some > other thread pool user). > >>> }; >>> >>> static void *worker_thread(void *opaque) >>> @@ -107,6 +109,10 @@ static void *worker_thread(void *opaque) >>> req = QTAILQ_FIRST(&pool->request_list); >>> QTAILQ_REMOVE(&pool->request_list, req, reqs); >>> req->state = THREAD_ACTIVE; >>> + >>> + assert(pool->requests_executing < SIZE_MAX); >>> + pool->requests_executing++; >>> + >>> qemu_mutex_unlock(&pool->lock); >>> >>> ret = req->func(req->arg); >>> @@ -118,6 +124,14 @@ static void *worker_thread(void *opaque) >>> >>> qemu_bh_schedule(pool->completion_bh); >>> qemu_mutex_lock(&pool->lock); >>> + >>> + assert(pool->requests_executing > 0); >>> + pool->requests_executing--; >>> + >>> + if (pool->requests_executing == 0 && >>> + QTAILQ_EMPTY(&pool->request_list)) { >>> + qemu_cond_signal(&pool->no_requests_cond); >>> + } >> >> An empty requests list and no request in flight means the worker will >> now exit after the timeout, no? Can you just kick the worker out of the >> wait and use pool->worker_stopped instead of the new condition variable? > > First, all threads won't terminate if either min_threads or max_threads > isn't 0. Ah I overlooked the break condition, nevermind. > It might be in the migration thread pool case but we are adding a > generic thread pool so it should be as universal as possible. > thread_pool_free() can get away with overwriting these values since > it is destroying the pool anyway. > > Also, the *_join() (or whatever its final name will be) operation is > about waiting for all requests / work items to finish, not about waiting > for threads to terminate. Right, but the idea was to piggyback on the thread termination to infer (the obvious) requests service termination. We cannot do that, as you've explained, fine. > It's essentially a synchronization point for a thread pool, not a cleanup. > >>> } >>> >>> pool->cur_threads--; >>> @@ -243,13 +257,16 @@ static const AIOCBInfo thread_pool_aiocb_info = { >>> .cancel_async = thread_pool_cancel, >>> }; >>> >>> -BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, >>> - void *arg, GDestroyNotify arg_destroy, >>> - BlockCompletionFunc *cb, void *opaque) >>> +BlockAIOCB *thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, >>> + void *arg, GDestroyNotify arg_destroy, >>> + BlockCompletionFunc *cb, void *opaque) >>> { >>> ThreadPoolElement *req; >>> AioContext *ctx = qemu_get_current_aio_context(); >>> - ThreadPool *pool = aio_get_thread_pool(ctx); >>> + >>> + if (!pool) { >>> + pool = aio_get_thread_pool(ctx); >>> + } >> >> I'd go for a separate implementation to really drive the point that this >> new usage is different. See the code snippet below. > > I see your point there - will split these implementations. > >> It seems we're a short step away to being able to use this >> implementation in a general way. Is there something that can be done >> with the 'common' field in the ThreadPoolElement? > > The non-AIO request flow still need the completion callback from BlockAIOCB > (and its argument pointer) so removing the "common" field from these requests > would need introducing two "flavors" of ThreadPoolElement. > > Not sure memory saving here are worth the increase in code complexity. I'm not asking that of you, but I think it should be done eventually. The QEMU block layer is very particular and I wouldn't want the use-cases for the thread-pool to get confused. But I can't see a way out right now, so let's postpone this, see if anyone else has comments. > >> ======== >> static void thread_pool_submit_request(ThreadPool *pool, ThreadPoolElement >> *req) >> { >> req->state = THREAD_QUEUED; >> req->pool = pool; >> >> QLIST_INSERT_HEAD(&pool->head, req, all); >> >> trace_thread_pool_submit(pool, req, req->arg); >> >> qemu_mutex_lock(&pool->lock); >> if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) { >> spawn_thread(pool); >> } >> QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); >> qemu_mutex_unlock(&pool->lock); >> qemu_cond_signal(&pool->request_cond); >> } >> >> BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, >> BlockCompletionFunc *cb, void *opaque) >> { >> ThreadPoolElement *req; >> AioContext *ctx = qemu_get_current_aio_context(); >> ThreadPool *pool = aio_get_thread_pool(ctx); >> >> /* Assert that the thread submitting work is the same running the pool >> */ >> assert(pool->ctx == qemu_get_current_aio_context()); >> >> req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque); >> req->func = func; >> req->arg = arg; >> >> thread_pool_submit_request(pool, req); >> return &req->common; >> } >> >> void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg) >> { >> ThreadPoolElement *req; >> >> req = g_malloc(sizeof(ThreadPoolElement)); >> req->func = func; >> req->arg = arg; >> >> thread_pool_submit_request(pool, req); >> } >> ================= >> >>> >>> /* Assert that the thread submitting work is the same running the >>> pool */ >>> assert(pool->ctx == qemu_get_current_aio_context()); >>> @@ -275,6 +292,18 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc >>> *func, >>> return &req->common; >>> } >>> >>> +BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, >>> + void *arg, GDestroyNotify arg_destroy, >>> + BlockCompletionFunc *cb, void *opaque) >>> +{ >>> + return thread_pool_submit(NULL, func, arg, arg_destroy, cb, opaque); >>> +} >>> + >>> +void thread_pool_poll(ThreadPool *pool) >>> +{ >>> + aio_bh_poll(pool->ctx); >>> +} >>> + >>> typedef struct ThreadPoolCo { >>> Coroutine *co; >>> int ret; >>> @@ -297,18 +326,38 @@ int coroutine_fn thread_pool_submit_co(ThreadPoolFunc >>> *func, void *arg) >>> return tpc.ret; >>> } >>> >>> -void thread_pool_submit(ThreadPoolFunc *func, >>> - void *arg, GDestroyNotify arg_destroy) >>> +void thread_pool_join(ThreadPool *pool) >> >> This is misleading because it's about the requests, not the threads in >> the pool. Compare with what thread_pool_free does: >> >> /* Wait for worker threads to terminate */ >> pool->max_threads = 0; >> qemu_cond_broadcast(&pool->request_cond); >> while (pool->cur_threads > 0) { >> qemu_cond_wait(&pool->worker_stopped, &pool->lock); >> } >> > > I'm open to thread_pool_join() better naming proposals. thread_pool_wait() might be better. > > Thanks, > Maciej