This patch moves the threadlet queue API code to qemu-threadlets.c where these APIs can be used by other subsystems.
Signed-off-by: Arun R Bharadwaj <a...@linux.vnet.ibm.com> Reviewed-by: Stefan Hajnoczi <stefa...@linux.vnet.ibm.com> --- posix-aio-compat.c | 137 ---------------------------------------------------- qemu-threadlet.c | 115 ++++++++++++++++++++++++++++++++++++++++++++ qemu-threadlet.h | 25 +++++++++ 3 files changed, 141 insertions(+), 136 deletions(-) diff --git a/posix-aio-compat.c b/posix-aio-compat.c index 96e28db..4fc9581 100644 --- a/posix-aio-compat.c +++ b/posix-aio-compat.c @@ -26,33 +26,13 @@ #include "qemu-common.h" #include "trace.h" #include "block_int.h" -#include "qemu-thread.h" +#include "qemu-threadlet.h" #include "block/raw-posix-aio.h" -#define MAX_GLOBAL_THREADS 64 -#define MIN_GLOBAL_THREADS 8 - static QemuMutex aiocb_mutex; static QemuCond aiocb_completion; -typedef struct ThreadletQueue -{ - QemuMutex lock; - QemuCond cond; - int max_threads; - int min_threads; - int cur_threads; - int idle_threads; - QTAILQ_HEAD(, ThreadletWork) request_list; -} ThreadletQueue; - -typedef struct ThreadletWork -{ - QTAILQ_ENTRY(ThreadletWork) node; - void (*func)(struct ThreadletWork *work); -} ThreadletWork; - struct qemu_paiocb { BlockDriverAIOCB common; int aio_fildes; @@ -79,10 +59,6 @@ typedef struct PosixAioState { struct qemu_paiocb *first_aio; } PosixAioState; -/* Default ThreadletQueue */ -static ThreadletQueue globalqueue; -static int globalqueue_init; - #ifdef CONFIG_PREADV static int preadv_present = 1; #else @@ -283,50 +259,6 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb) return nbytes; } -static void *threadlet_worker(void *data) -{ - ThreadletQueue *queue = data; - - qemu_mutex_lock(&queue->lock); - while (1) { - ThreadletWork *work; - int ret = 0; - - while (QTAILQ_EMPTY(&queue->request_list) && - (ret != ETIMEDOUT)) { - /* wait for cond to be signalled or broadcast for 1000s */ - ret = qemu_cond_timedwait((&queue->cond), - &(queue->lock), 10*100000); - } - - assert(queue->idle_threads != 0); - if (QTAILQ_EMPTY(&queue->request_list)) { - if (queue->cur_threads > queue->min_threads) { - /* We retain the minimum number of threads */ - break; - } - } else { - work = QTAILQ_FIRST(&queue->request_list); - QTAILQ_REMOVE(&queue->request_list, work, node); - - queue->idle_threads--; - qemu_mutex_unlock(&queue->lock); - - /* execute the work function */ - work->func(work); - - qemu_mutex_lock(&queue->lock); - queue->idle_threads++; - } - } - - queue->idle_threads--; - queue->cur_threads--; - qemu_mutex_unlock(&queue->lock); - - return NULL; -} - static PosixAioState *posix_aio_state; static void handle_work(ThreadletWork *work) @@ -371,48 +303,6 @@ static void handle_work(ThreadletWork *work) } } -static void spawn_threadlet(ThreadletQueue *queue) -{ - QemuThread thread; - - queue->cur_threads++; - queue->idle_threads++; - - qemu_thread_create(&thread, threadlet_worker, queue); -} - -/** - * submit_work: Submit to the global queue a new task to be executed - * asynchronously. - * @work: Contains information about the task that needs to be submitted. - */ -static void submit_work(ThreadletWork *work) -{ - qemu_mutex_lock(&globalqueue.lock); - - if (!globalqueue_init) { - globalqueue.cur_threads = 0; - globalqueue.idle_threads = 0; - globalqueue.max_threads = MAX_GLOBAL_THREADS; - globalqueue.min_threads = MIN_GLOBAL_THREADS; - QTAILQ_INIT(&globalqueue.request_list); - qemu_mutex_init(&globalqueue.lock); - qemu_cond_init(&globalqueue.cond); - - globalqueue_init = 1; - } - - if (globalqueue.idle_threads == 0 && - globalqueue.cur_threads < globalqueue.max_threads) { - spawn_threadlet(&globalqueue); - - } else { - qemu_cond_signal(&globalqueue.cond); - } - QTAILQ_INSERT_TAIL(&globalqueue.request_list, work, node); - qemu_mutex_unlock(&globalqueue.lock); -} - static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb) { ssize_t ret; @@ -545,31 +435,6 @@ static void paio_remove(struct qemu_paiocb *acb) } } -/** - * dequeue_work: Cancel a task queued on the global queue. - * @work: Contains the information of the task that needs to be cancelled. - * - * Returns: 0 if successfully dequeued work. - * 1 otherwise. - */ -static int dequeue_work(ThreadletWork *work) -{ - int ret = 1; - ThreadletWork *ret_work; - - qemu_mutex_lock(&globalqueue.lock); - QTAILQ_FOREACH(ret_work, &(globalqueue.request_list), node) { - if (ret_work == work) { - QTAILQ_REMOVE(&globalqueue.request_list, ret_work, node); - ret = 0; - break; - } - } - qemu_mutex_unlock(&globalqueue.lock); - - return ret; -} - static void paio_cancel(BlockDriverAIOCB *blockacb) { struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb; diff --git a/qemu-threadlet.c b/qemu-threadlet.c index 857d08d..6523f08 100644 --- a/qemu-threadlet.c +++ b/qemu-threadlet.c @@ -19,6 +19,12 @@ #include "qemu-threadlet.h" #include "osdep.h" +#define MAX_GLOBAL_THREADS 64 +#define MIN_GLOBAL_THREADS 8 + +static ThreadletQueue globalqueue; +static int globalqueue_init; + static void threadlet_io_completion_signal_handler(int signum) { qemu_service_io(); @@ -37,3 +43,112 @@ void threadlet_init(void) { threadlet_register_signal_handler(); } + +static void *threadlet_worker(void *data) +{ + ThreadletQueue *queue = data; + + qemu_mutex_lock(&queue->lock); + while (1) { + ThreadletWork *work; + int ret = 0; + + while (QTAILQ_EMPTY(&queue->request_list) && + (ret != ETIMEDOUT)) { + /* wait for cond to be signalled or broadcast for 1000s */ + ret = qemu_cond_timedwait((&queue->cond), + &(queue->lock), 10*100000); + } + + assert(queue->idle_threads != 0); + if (QTAILQ_EMPTY(&queue->request_list)) { + if (queue->cur_threads > queue->min_threads) { + /* We retain the minimum number of threads */ + break; + } + } else { + work = QTAILQ_FIRST(&queue->request_list); + QTAILQ_REMOVE(&queue->request_list, work, node); + + queue->idle_threads--; + qemu_mutex_unlock(&queue->lock); + + /* execute the work function */ + work->func(work); + + qemu_mutex_lock(&queue->lock); + queue->idle_threads++; + } + } + + queue->idle_threads--; + queue->cur_threads--; + qemu_mutex_unlock(&queue->lock); + + return NULL; +} + +static void spawn_threadlet(ThreadletQueue *queue) +{ + QemuThread thread; + + queue->cur_threads++; + queue->idle_threads++; + + qemu_thread_create(&thread, threadlet_worker, queue); +} + +/** + * submit_work: Submit to the global queue a new task to be executed + * asynchronously. + * @work: Contains information about the task that needs to be submitted. + */ +void submit_work(ThreadletWork *work) +{ + if (!globalqueue_init) { + globalqueue.cur_threads = 0; + globalqueue.idle_threads = 0; + globalqueue.max_threads = MAX_GLOBAL_THREADS; + globalqueue.min_threads = MIN_GLOBAL_THREADS; + QTAILQ_INIT(&globalqueue.request_list); + qemu_mutex_init(&globalqueue.lock); + qemu_cond_init(&globalqueue.cond); + + globalqueue_init = 1; + } + + qemu_mutex_lock(&globalqueue.lock); + if (globalqueue.idle_threads == 0 && + globalqueue.cur_threads < globalqueue.max_threads) { + spawn_threadlet(&globalqueue); + } else { + qemu_cond_signal(&globalqueue.cond); + } + QTAILQ_INSERT_TAIL(&globalqueue.request_list, work, node); + qemu_mutex_unlock(&globalqueue.lock); +} + +/** + * dequeue_work: Cancel a task queued on the global queue. + * @work: Contains the information of the task that needs to be cancelled. + * + * Returns: 0 if successfully dequeued work. + * 1 otherwise. + */ +int dequeue_work(ThreadletWork *work) +{ + int ret = 1; + ThreadletWork *ret_work; + + qemu_mutex_lock(&globalqueue.lock); + QTAILQ_FOREACH(ret_work, &(globalqueue.request_list), node) { + if (ret_work == work) { + QTAILQ_REMOVE(&globalqueue.request_list, ret_work, node); + ret = 0; + break; + } + } + qemu_mutex_unlock(&globalqueue.lock); + + return ret; +} diff --git a/qemu-threadlet.h b/qemu-threadlet.h index 2c225d6..6b11c86 100644 --- a/qemu-threadlet.h +++ b/qemu-threadlet.h @@ -14,6 +14,31 @@ * the COPYING file in the top-level directory. */ +#ifndef QEMU_ASYNC_WORK_H +#define QEMU_ASYNC_WORK_H + +#include "qemu-queue.h" #include "qemu-common.h" +#include "qemu-thread.h" + +typedef struct ThreadletQueue +{ + QemuMutex lock; + QemuCond cond; + int max_threads; + int min_threads; + int cur_threads; + int idle_threads; + QTAILQ_HEAD(, ThreadletWork) request_list; +} ThreadletQueue; + +typedef struct ThreadletWork +{ + QTAILQ_ENTRY(ThreadletWork) node; + void (*func)(struct ThreadletWork *work); +} ThreadletWork; +void submit_work(ThreadletWork *work); +int dequeue_work(ThreadletWork *work); void threadlet_init(void); +#endif