Mark changes in thread pool state as explicitly atomic. Also in the test-thread-pool code make accesses to data.n explicitly atomic.
Signed-off-by: Alex Bennée <alex.ben...@linaro.org> --- tests/test-thread-pool.c | 8 ++++---- thread-pool.c | 12 ++++++------ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c index ccdee39..f51e284 100644 --- a/tests/test-thread-pool.c +++ b/tests/test-thread-pool.c @@ -46,10 +46,10 @@ static void test_submit(void) { WorkerTestData data = { .n = 0 }; thread_pool_submit(pool, worker_cb, &data); - while (data.n == 0) { + while (atomic_read(&data.n) == 0) { aio_poll(ctx, true); } - g_assert_cmpint(data.n, ==, 1); + g_assert_cmpint(atomic_read(&data.n), ==, 1); } static void test_submit_aio(void) @@ -128,7 +128,7 @@ static void test_submit_many(void) aio_poll(ctx, true); } for (i = 0; i < 100; i++) { - g_assert_cmpint(data[i].n, ==, 1); + g_assert_cmpint(atomic_read(&data[i].n), ==, 1); g_assert_cmpint(data[i].ret, ==, 0); } } @@ -183,7 +183,7 @@ static void do_test_cancel(bool sync) g_assert_cmpint(num_canceled, <, 100); for (i = 0; i < 100; i++) { - if (data[i].aiocb && data[i].n != 3) { + if (data[i].aiocb && (atomic_read(&data[i].n) != 3)) { if (sync) { /* Canceling the others will be a blocking operation. */ bdrv_aio_cancel(data[i].aiocb); diff --git a/thread-pool.c b/thread-pool.c index 402c778..431a6fb 100644 --- a/thread-pool.c +++ b/thread-pool.c @@ -99,15 +99,15 @@ static void *worker_thread(void *opaque) req = QTAILQ_FIRST(&pool->request_list); QTAILQ_REMOVE(&pool->request_list, req, reqs); - req->state = THREAD_ACTIVE; + atomic_set(&req->state, THREAD_ACTIVE); qemu_mutex_unlock(&pool->lock); ret = req->func(req->arg); - req->ret = ret; + atomic_set(&req->ret, ret); /* Write ret before state. */ smp_wmb(); - req->state = THREAD_DONE; + atomic_set(&req->state, THREAD_DONE); qemu_mutex_lock(&pool->lock); @@ -167,7 +167,7 @@ static void thread_pool_completion_bh(void *opaque) restart: QLIST_FOREACH_SAFE(elem, &pool->head, all, next) { - if (elem->state != THREAD_DONE) { + if (atomic_read(&elem->state) != THREAD_DONE) { continue; } @@ -184,7 +184,7 @@ restart: */ qemu_bh_schedule(pool->completion_bh); - elem->common.cb(elem->common.opaque, elem->ret); + elem->common.cb(elem->common.opaque, atomic_read(&elem->ret)); qemu_aio_unref(elem); goto restart; } else { @@ -201,7 +201,7 @@ static void thread_pool_cancel(BlockAIOCB *acb) trace_thread_pool_cancel(elem, elem->common.opaque); qemu_mutex_lock(&pool->lock); - if (elem->state == THREAD_QUEUED && + if (atomic_read(&elem->state) == THREAD_QUEUED && /* No thread has yet started working on elem. we can try to "steal" * the item from the worker if we can get a signal from the * semaphore. Because this is non-blocking, we can do it with -- 2.7.0