"Maciej S. Szmigiero" <m...@maciej.szmigiero.name> writes:

> On 3.09.2024 00:07, Fabiano Rosas wrote:
>> "Maciej S. Szmigiero" <m...@maciej.szmigiero.name> writes:
>> 
>>> From: "Maciej S. Szmigiero" <maciej.szmigi...@oracle.com>
>>>
>>> Migration code wants to manage device data sending threads in one place.
>>>
>>> QEMU has an existing thread pool implementation, however it was limited
>>> to queuing AIO operations only and essentially had a 1:1 mapping between
>>> the current AioContext and the ThreadPool in use.
>>>
>>> Implement what is necessary to queue generic (non-AIO) work on a ThreadPool
>>> too.
>>>
>>> This brings a few new operations on a pool:
>>> * thread_pool_set_minmax_threads() explicitly sets the minimum and maximum
>>> thread count in the pool.
>>>
>>> * thread_pool_join() operation waits until all the submitted work requests
>>> have finished.
>>>
>>> * thread_pool_poll() lets the new thread and / or thread completion bottom
>>> halves run (if they are indeed scheduled to be run).
>>> It is useful for thread pool users that need to launch or terminate new
>>> threads without returning to the QEMU main loop.
>>>
>>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigi...@oracle.com>
>>> ---
>>>   include/block/thread-pool.h   | 10 ++++-
>>>   tests/unit/test-thread-pool.c |  2 +-
>>>   util/thread-pool.c            | 77 ++++++++++++++++++++++++++++++-----
>>>   3 files changed, 76 insertions(+), 13 deletions(-)
>>>
>>> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
>>> index b484c4780ea6..1769496056cd 100644
>>> --- a/include/block/thread-pool.h
>>> +++ b/include/block/thread-pool.h
>>> @@ -37,9 +37,15 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>>>                                      void *arg, GDestroyNotify arg_destroy,
>>>                                      BlockCompletionFunc *cb, void *opaque);
>>>   int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
>>> -void thread_pool_submit(ThreadPoolFunc *func,
>>> -                        void *arg, GDestroyNotify arg_destroy);
>>> +BlockAIOCB *thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
>>> +                               void *arg, GDestroyNotify arg_destroy,
>>> +                               BlockCompletionFunc *cb, void *opaque);
>> 
>> These kinds of changes (create wrappers, change signatures, etc), could
>> be in their own patch as it's just code motion that should not have
>> functional impact. The "no_requests" stuff would be better discussed in
>> a separate patch.
>
> These changes *all* should have no functional impact on existing callers.
>
> But I get your overall point, will try to separate these really trivial
> parts.

Yeah, I guess I meant that one set of changes has a larger potential for
introducing a bug while the other is clearly harmless.

>
>>>   
>>> +void thread_pool_join(ThreadPool *pool);
>>> +void thread_pool_poll(ThreadPool *pool);
>>> +
>>> +void thread_pool_set_minmax_threads(ThreadPool *pool,
>>> +                                    int min_threads, int max_threads);
>>>   void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
>>>   
>>>   #endif
>>> diff --git a/tests/unit/test-thread-pool.c b/tests/unit/test-thread-pool.c
>>> index e4afb9e36292..469c0f7057b6 100644
>>> --- a/tests/unit/test-thread-pool.c
>>> +++ b/tests/unit/test-thread-pool.c
>>> @@ -46,7 +46,7 @@ static void done_cb(void *opaque, int ret)
>>>   static void test_submit(void)
>>>   {
>>>       WorkerTestData data = { .n = 0 };
>>> -    thread_pool_submit(worker_cb, &data, NULL);
>>> +    thread_pool_submit(NULL, worker_cb, &data, NULL, NULL, NULL);
>>>       while (data.n == 0) {
>>>           aio_poll(ctx, true);
>>>       }
>>> diff --git a/util/thread-pool.c b/util/thread-pool.c
>>> index 69a87ee79252..2bf3be875a51 100644
>>> --- a/util/thread-pool.c
>>> +++ b/util/thread-pool.c
>>> @@ -60,6 +60,7 @@ struct ThreadPool {
>>>       QemuMutex lock;
>>>       QemuCond worker_stopped;
>>>       QemuCond request_cond;
>>> +    QemuCond no_requests_cond;
>>>       QEMUBH *new_thread_bh;
>>>   
>>>       /* The following variables are only accessed from one AioContext. */
>>> @@ -73,6 +74,7 @@ struct ThreadPool {
>>>       int pending_threads; /* threads created but not running yet */
>>>       int min_threads;
>>>       int max_threads;
>>> +    size_t requests_executing;
>> 
>> What's with size_t? Should this be a uint32_t instead?
>
> Sizes of objects are normally size_t, since otherwise bad
> things happen if objects are bigger than 4 GiB.

Ok, but requests_executing is not the size of an object. It's the number
of objects in a linked list that satisfy a certain predicate. There are
no address space size considerations here.

>
> Considering that the minimum object size is 1 byte the
> max count of distinct objects also needs a size_t to not
> risk an overflow.

I'm not sure I get you, there's no overflow since you're bounds checking
with the assert. Or is this a more abstract line of thought about how
many ThreadPoolElements can be present in memory at a time and you'd
like a type that's certain to fit the theoretical amount of objects?

>
> I think that while 2^32 requests executing seems unlikely
> saving 4 bytes seems not worth worrying that someone will
> find a vulnerability triggered by overflowing a 32-bit
> variable (not necessary in the migration code but in some
> other thread pool user).
>
>>>   };
>>>   
>>>   static void *worker_thread(void *opaque)
>>> @@ -107,6 +109,10 @@ static void *worker_thread(void *opaque)
>>>           req = QTAILQ_FIRST(&pool->request_list);
>>>           QTAILQ_REMOVE(&pool->request_list, req, reqs);
>>>           req->state = THREAD_ACTIVE;
>>> +
>>> +        assert(pool->requests_executing < SIZE_MAX);
>>> +        pool->requests_executing++;
>>> +
>>>           qemu_mutex_unlock(&pool->lock);
>>>   
>>>           ret = req->func(req->arg);
>>> @@ -118,6 +124,14 @@ static void *worker_thread(void *opaque)
>>>   
>>>           qemu_bh_schedule(pool->completion_bh);
>>>           qemu_mutex_lock(&pool->lock);
>>> +
>>> +        assert(pool->requests_executing > 0);
>>> +        pool->requests_executing--;
>>> +
>>> +        if (pool->requests_executing == 0 &&
>>> +            QTAILQ_EMPTY(&pool->request_list)) {
>>> +            qemu_cond_signal(&pool->no_requests_cond);
>>> +        }
>> 
>> An empty requests list and no request in flight means the worker will
>> now exit after the timeout, no? Can you just kick the worker out of the
>> wait and use pool->worker_stopped instead of the new condition variable?
>
> First, all threads won't terminate if either min_threads or max_threads
> isn't 0.

Ah I overlooked the break condition, nevermind.

> It might be in the migration thread pool case but we are adding a
> generic thread pool so it should be as universal as possible.
> thread_pool_free() can get away with overwriting these values since
> it is destroying the pool anyway.
>
> Also, the *_join() (or whatever its final name will be) operation is
> about waiting for all requests / work items to finish, not about waiting
> for threads to terminate.

Right, but the idea was to piggyback on the thread termination to infer
(the obvious) requests service termination. We cannot do that, as you've
explained, fine.

> It's essentially a synchronization point for a thread pool, not a cleanup.
>
>>>       }
>>>   
>>>       pool->cur_threads--;
>>> @@ -243,13 +257,16 @@ static const AIOCBInfo thread_pool_aiocb_info = {
>>>       .cancel_async       = thread_pool_cancel,
>>>   };
>>>   
>>> -BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>>> -                                   void *arg, GDestroyNotify arg_destroy,
>>> -                                   BlockCompletionFunc *cb, void *opaque)
>>> +BlockAIOCB *thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
>>> +                               void *arg, GDestroyNotify arg_destroy,
>>> +                               BlockCompletionFunc *cb, void *opaque)
>>>   {
>>>       ThreadPoolElement *req;
>>>       AioContext *ctx = qemu_get_current_aio_context();
>>> -    ThreadPool *pool = aio_get_thread_pool(ctx);
>>> +
>>> +    if (!pool) {
>>> +        pool = aio_get_thread_pool(ctx);
>>> +    }
>> 
>> I'd go for a separate implementation to really drive the point that this
>> new usage is different. See the code snippet below.
>
> I see your point there - will split these implementations.
>
>> It seems we're a short step away to being able to use this
>> implementation in a general way. Is there something that can be done
>> with the 'common' field in the ThreadPoolElement?
>
> The non-AIO request flow still need the completion callback from BlockAIOCB
> (and its argument pointer) so removing the "common" field from these requests
> would need introducing two "flavors" of ThreadPoolElement.
>
> Not sure memory saving here are worth the increase in code complexity.

I'm not asking that of you, but I think it should be done
eventually. The QEMU block layer is very particular and I wouldn't want
the use-cases for the thread-pool to get confused. But I can't see a way
out right now, so let's postpone this, see if anyone else has comments.

>
>> ========
>> static void thread_pool_submit_request(ThreadPool *pool, ThreadPoolElement 
>> *req)
>> {
>>      req->state = THREAD_QUEUED;
>>      req->pool = pool;
>> 
>>      QLIST_INSERT_HEAD(&pool->head, req, all);
>> 
>>      trace_thread_pool_submit(pool, req, req->arg);
>> 
>>      qemu_mutex_lock(&pool->lock);
>>      if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
>>          spawn_thread(pool);
>>      }
>>      QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
>>      qemu_mutex_unlock(&pool->lock);
>>      qemu_cond_signal(&pool->request_cond);
>> }
>> 
>> BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
>>                                     BlockCompletionFunc *cb, void *opaque)
>> {
>>      ThreadPoolElement *req;
>>      AioContext *ctx = qemu_get_current_aio_context();
>>      ThreadPool *pool = aio_get_thread_pool(ctx);
>> 
>>      /* Assert that the thread submitting work is the same running the pool 
>> */
>>      assert(pool->ctx == qemu_get_current_aio_context());
>> 
>>      req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
>>      req->func = func;
>>      req->arg = arg;
>> 
>>      thread_pool_submit_request(pool, req);
>>      return &req->common;
>> }
>> 
>> void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
>> {
>>      ThreadPoolElement *req;
>> 
>>      req = g_malloc(sizeof(ThreadPoolElement));
>>      req->func = func;
>>      req->arg = arg;
>> 
>>      thread_pool_submit_request(pool, req);
>> }
>> =================
>> 
>>>   
>>>       /* Assert that the thread submitting work is the same running the 
>>> pool */
>>>       assert(pool->ctx == qemu_get_current_aio_context());
>>> @@ -275,6 +292,18 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc 
>>> *func,
>>>       return &req->common;
>>>   }
>>>   
>>> +BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>>> +                                   void *arg, GDestroyNotify arg_destroy,
>>> +                                   BlockCompletionFunc *cb, void *opaque)
>>> +{
>>> +    return thread_pool_submit(NULL, func, arg, arg_destroy, cb, opaque);
>>> +}
>>> +
>>> +void thread_pool_poll(ThreadPool *pool)
>>> +{
>>> +    aio_bh_poll(pool->ctx);
>>> +}
>>> +
>>>   typedef struct ThreadPoolCo {
>>>       Coroutine *co;
>>>       int ret;
>>> @@ -297,18 +326,38 @@ int coroutine_fn thread_pool_submit_co(ThreadPoolFunc 
>>> *func, void *arg)
>>>       return tpc.ret;
>>>   }
>>>   
>>> -void thread_pool_submit(ThreadPoolFunc *func,
>>> -                        void *arg, GDestroyNotify arg_destroy)
>>> +void thread_pool_join(ThreadPool *pool)
>> 
>> This is misleading because it's about the requests, not the threads in
>> the pool. Compare with what thread_pool_free does:
>> 
>>      /* Wait for worker threads to terminate */
>>      pool->max_threads = 0;
>>      qemu_cond_broadcast(&pool->request_cond);
>>      while (pool->cur_threads > 0) {
>>          qemu_cond_wait(&pool->worker_stopped, &pool->lock);
>>      }
>> 
>
> I'm open to thread_pool_join() better naming proposals.

thread_pool_wait() might be better.

>
> Thanks,
> Maciej

Reply via email to