Move qemu_paio_submit() to the new infrastructure and introduce the necessary APIs.
Signed-off-by: Arun R Bharadwaj <a...@linux.vnet.ibm.com> --- posix-aio-compat.c | 224 ++++++++++++++++++++++++++++++++++------------------ 1 files changed, 147 insertions(+), 77 deletions(-) diff --git a/posix-aio-compat.c b/posix-aio-compat.c index eb1e2db..3f3c461 100644 --- a/posix-aio-compat.c +++ b/posix-aio-compat.c @@ -31,6 +31,9 @@ #include "block/raw-posix-aio.h" +#define MAX_GLOBAL_THREADS 64 +#define MIN_GLOBAL_THREADS 8 + static QemuMutex aiocb_mutex; static QemuCond aiocb_completion; @@ -52,6 +55,7 @@ typedef struct ThreadletWork } ThreadletWork; static ThreadletQueue globalqueue; +static int globalqueue_init; struct qemu_paiocb { BlockDriverAIOCB common; @@ -81,6 +85,100 @@ typedef struct PosixAioState { struct qemu_paiocb *first_aio; } PosixAioState; +static void *threadlet_worker(void *data) +{ + ThreadletQueue *queue = data; + + qemu_mutex_lock(&queue->lock); + while (1) { + ThreadletWork *work; + int ret = 0; + + while (QTAILQ_EMPTY(&queue->request_list) && + (ret != ETIMEDOUT)) { + /* wait for cond to be signalled or broadcast for 1000s */ + ret = qemu_cond_timedwait((&queue->cond), + &(queue->lock), 10*100000); + } + + assert(queue->idle_threads != 0); + if (QTAILQ_EMPTY(&queue->request_list)) { + if (queue->cur_threads > queue->min_threads) { + /* We retain the minimum number of threads */ + break; + } + } else { + work = QTAILQ_FIRST(&queue->request_list); + QTAILQ_REMOVE(&queue->request_list, work, node); + + queue->idle_threads--; + qemu_mutex_unlock(&queue->lock); + + /* execute the work function */ + work->func(work); + + qemu_mutex_lock(&queue->lock); + queue->idle_threads++; + } + } + + queue->idle_threads--; + queue->cur_threads--; + qemu_mutex_unlock(&queue->lock); + + return NULL; +} + +static void spawn_threadlet(ThreadletQueue *queue) +{ + QemuThread thread; + + queue->cur_threads++; + queue->idle_threads++; + + qemu_thread_create(&thread, threadlet_worker, queue); +} + +/** + * submit_work_to_queue: Submit a new task to a private queue to be + * executed asynchronously. + * @queue: Per-subsystem private queue to which the new task needs + * to be submitted. + * @work: Contains information about the task that needs to be submitted. + */ + +static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work) +{ + qemu_mutex_lock(&queue->lock); + if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) { + spawn_threadlet(queue); + } else { + qemu_cond_signal(&queue->cond); + } + QTAILQ_INSERT_TAIL(&queue->request_list, work, node); + qemu_mutex_unlock(&queue->lock); +} + +static void threadlet_queue_init(ThreadletQueue *queue, + int max_threads, int min_threads); + +/** + * submit_work: Submit to the global queue a new task to be executed + * asynchronously. + * @work: Contains information about the task that needs to be submitted. + */ + +static void submit_work(ThreadletWork *work) +{ + if (!globalqueue_init) { + threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS, + MIN_GLOBAL_THREADS); + globalqueue_init = 1; + } + + submit_work_to_queue(&globalqueue, work); +} + /** * dequeue_work_on_queue: Cancel a task queued on a Queue. * @queue: The queue containing the task to be cancelled. @@ -121,6 +219,25 @@ static int dequeue_work(ThreadletWork *work) return dequeue_work_on_queue(&globalqueue, work); } +/** + * threadlet_queue_init: Initialize a threadlet queue. + * @queue: The threadlet queue to be initialized. + * @max_threads: Maximum number of threads processing the queue. + * @min_threads: Minimum number of threads processing the queue. + */ + +static void threadlet_queue_init(ThreadletQueue *queue, + int max_threads, int min_threads) +{ + queue->cur_threads = 0; + queue->idle_threads = 0; + queue->max_threads = max_threads; + queue->min_threads = min_threads; + QTAILQ_INIT(&queue->request_list); + qemu_mutex_init(&queue->lock); + qemu_cond_init(&queue->cond); +} + static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; static pthread_t thread_id; @@ -363,96 +480,49 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb) return nbytes; } -static void *aio_thread(void *unused) +static void aio_thread(ThreadletWork *work) { pid_t pid; + struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work); + ssize_t ret = 0; pid = getpid(); - while (1) { - struct qemu_paiocb *aiocb; - ssize_t ret = 0; - qemu_timeval tv; - struct timespec ts; - - qemu_gettimeofday(&tv); - ts.tv_sec = tv.tv_sec + 10; - ts.tv_nsec = 0; - - mutex_lock(&lock); - - while (QTAILQ_EMPTY(&request_list) && - !(ret == ETIMEDOUT)) { - ret = cond_timedwait(&cond, &lock, &ts); - } - - if (QTAILQ_EMPTY(&request_list)) - break; - - aiocb = QTAILQ_FIRST(&request_list); - QTAILQ_REMOVE(&request_list, aiocb, node); - aiocb->active = 1; - idle_threads--; - mutex_unlock(&lock); - - switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) { - case QEMU_AIO_READ: - case QEMU_AIO_WRITE: - ret = handle_aiocb_rw(aiocb); - break; - case QEMU_AIO_FLUSH: - ret = handle_aiocb_flush(aiocb); - break; - case QEMU_AIO_IOCTL: - ret = handle_aiocb_ioctl(aiocb); - break; - default: - fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type); - ret = -EINVAL; - break; - } - - mutex_lock(&lock); - aiocb->ret = ret; - idle_threads++; - mutex_unlock(&lock); - - if (kill(pid, aiocb->ev_signo)) die("kill failed"); + switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) { + case QEMU_AIO_READ: + case QEMU_AIO_WRITE: + ret = handle_aiocb_rw(aiocb); + break; + case QEMU_AIO_FLUSH: + ret = handle_aiocb_flush(aiocb); + break; + case QEMU_AIO_IOCTL: + ret = handle_aiocb_ioctl(aiocb); + break; + default: + fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type); + ret = -EINVAL; + break; } - idle_threads--; - cur_threads--; - mutex_unlock(&lock); - - return NULL; -} - -static void spawn_thread(void) -{ - sigset_t set, oldset; - - cur_threads++; - idle_threads++; - - /* block all signals */ - if (sigfillset(&set)) die("sigfillset"); - if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask"); - - thread_create(&thread_id, &attr, aio_thread, NULL); + qemu_mutex_lock(&aiocb_mutex); + aiocb->ret = ret; + qemu_cond_broadcast(&aiocb_completion); + qemu_mutex_unlock(&aiocb_mutex); - if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore"); + if (kill(pid, aiocb->ev_signo)) { + die("kill failed"); + } } static void qemu_paio_submit(struct qemu_paiocb *aiocb) { + qemu_mutex_lock(&aiocb_mutex); aiocb->ret = -EINPROGRESS; - aiocb->active = 0; - mutex_lock(&lock); - if (idle_threads == 0 && cur_threads < max_threads) - spawn_thread(); - QTAILQ_INSERT_TAIL(&request_list, aiocb, node); - mutex_unlock(&lock); - cond_signal(&cond); + qemu_mutex_unlock(&aiocb_mutex); + + aiocb->work.func = aio_thread; + submit_work(&aiocb->work); } static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)