On 11/18/2010 10:07 AM, Arun R Bharadwaj wrote: > Move qemu_paio_submit() to the new infrastructure and > introduce the necessary APIs. > > Signed-off-by: Arun R Bharadwaj <a...@linux.vnet.ibm.com>
Looks good to me. Acked-by: Venkateswararao Jujjuri <jv...@linux.vnet.ibm.com> > --- > posix-aio-compat.c | 224 > ++++++++++++++++++++++++++++++++++------------------ > 1 files changed, 147 insertions(+), 77 deletions(-) > > diff --git a/posix-aio-compat.c b/posix-aio-compat.c > index eb1e2db..3f3c461 100644 > --- a/posix-aio-compat.c > +++ b/posix-aio-compat.c > @@ -31,6 +31,9 @@ > > #include "block/raw-posix-aio.h" > > +#define MAX_GLOBAL_THREADS 64 > +#define MIN_GLOBAL_THREADS 8 > + > static QemuMutex aiocb_mutex; > static QemuCond aiocb_completion; > > @@ -52,6 +55,7 @@ typedef struct ThreadletWork > } ThreadletWork; > > static ThreadletQueue globalqueue; > +static int globalqueue_init; > > struct qemu_paiocb { > BlockDriverAIOCB common; > @@ -81,6 +85,100 @@ typedef struct PosixAioState { > struct qemu_paiocb *first_aio; > } PosixAioState; > > +static void *threadlet_worker(void *data) > +{ > + ThreadletQueue *queue = data; > + > + qemu_mutex_lock(&queue->lock); > + while (1) { > + ThreadletWork *work; > + int ret = 0; > + > + while (QTAILQ_EMPTY(&queue->request_list) && > + (ret != ETIMEDOUT)) { > + /* wait for cond to be signalled or broadcast for 1000s */ > + ret = qemu_cond_timedwait((&queue->cond), > + &(queue->lock), 10*100000); > + } > + > + assert(queue->idle_threads != 0); > + if (QTAILQ_EMPTY(&queue->request_list)) { > + if (queue->cur_threads > queue->min_threads) { > + /* We retain the minimum number of threads */ > + break; > + } > + } else { > + work = QTAILQ_FIRST(&queue->request_list); > + QTAILQ_REMOVE(&queue->request_list, work, node); > + > + queue->idle_threads--; > + qemu_mutex_unlock(&queue->lock); > + > + /* execute the work function */ > + work->func(work); > + > + qemu_mutex_lock(&queue->lock); > + queue->idle_threads++; > + } > + } > + > + queue->idle_threads--; > + queue->cur_threads--; > + qemu_mutex_unlock(&queue->lock); > + > + return NULL; > +} > + > +static void spawn_threadlet(ThreadletQueue *queue) > +{ > + QemuThread thread; > + > + queue->cur_threads++; > + queue->idle_threads++; > + > + qemu_thread_create(&thread, threadlet_worker, queue); > +} > + > +/** > + * submit_work_to_queue: Submit a new task to a private queue to be > + * executed asynchronously. > + * @queue: Per-subsystem private queue to which the new task needs > + * to be submitted. > + * @work: Contains information about the task that needs to be submitted. > + */ > + > +static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work) > +{ > + qemu_mutex_lock(&queue->lock); > + if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) > { > + spawn_threadlet(queue); > + } else { > + qemu_cond_signal(&queue->cond); > + } > + QTAILQ_INSERT_TAIL(&queue->request_list, work, node); > + qemu_mutex_unlock(&queue->lock); > +} > + > +static void threadlet_queue_init(ThreadletQueue *queue, > + int max_threads, int min_threads); > + > +/** > + * submit_work: Submit to the global queue a new task to be executed > + * asynchronously. > + * @work: Contains information about the task that needs to be submitted. > + */ > + > +static void submit_work(ThreadletWork *work) > +{ > + if (!globalqueue_init) { > + threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS, > + MIN_GLOBAL_THREADS); > + globalqueue_init = 1; > + } > + > + submit_work_to_queue(&globalqueue, work); > +} > + > /** > * dequeue_work_on_queue: Cancel a task queued on a Queue. > * @queue: The queue containing the task to be cancelled. > @@ -121,6 +219,25 @@ static int dequeue_work(ThreadletWork *work) > return dequeue_work_on_queue(&globalqueue, work); > } > > +/** > + * threadlet_queue_init: Initialize a threadlet queue. > + * @queue: The threadlet queue to be initialized. > + * @max_threads: Maximum number of threads processing the queue. > + * @min_threads: Minimum number of threads processing the queue. > + */ > + > +static void threadlet_queue_init(ThreadletQueue *queue, > + int max_threads, int min_threads) > +{ > + queue->cur_threads = 0; > + queue->idle_threads = 0; > + queue->max_threads = max_threads; > + queue->min_threads = min_threads; > + QTAILQ_INIT(&queue->request_list); > + qemu_mutex_init(&queue->lock); > + qemu_cond_init(&queue->cond); > +} > + > static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; > static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; > static pthread_t thread_id; > @@ -363,96 +480,49 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb > *aiocb) > return nbytes; > } > > -static void *aio_thread(void *unused) > +static void aio_thread(ThreadletWork *work) > { > pid_t pid; > + struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work); > + ssize_t ret = 0; > > pid = getpid(); > > - while (1) { > - struct qemu_paiocb *aiocb; > - ssize_t ret = 0; > - qemu_timeval tv; > - struct timespec ts; > - > - qemu_gettimeofday(&tv); > - ts.tv_sec = tv.tv_sec + 10; > - ts.tv_nsec = 0; > - > - mutex_lock(&lock); > - > - while (QTAILQ_EMPTY(&request_list) && > - !(ret == ETIMEDOUT)) { > - ret = cond_timedwait(&cond, &lock, &ts); > - } > - > - if (QTAILQ_EMPTY(&request_list)) > - break; > - > - aiocb = QTAILQ_FIRST(&request_list); > - QTAILQ_REMOVE(&request_list, aiocb, node); > - aiocb->active = 1; > - idle_threads--; > - mutex_unlock(&lock); > - > - switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) { > - case QEMU_AIO_READ: > - case QEMU_AIO_WRITE: > - ret = handle_aiocb_rw(aiocb); > - break; > - case QEMU_AIO_FLUSH: > - ret = handle_aiocb_flush(aiocb); > - break; > - case QEMU_AIO_IOCTL: > - ret = handle_aiocb_ioctl(aiocb); > - break; > - default: > - fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type); > - ret = -EINVAL; > - break; > - } > - > - mutex_lock(&lock); > - aiocb->ret = ret; > - idle_threads++; > - mutex_unlock(&lock); > - > - if (kill(pid, aiocb->ev_signo)) die("kill failed"); > + switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) { > + case QEMU_AIO_READ: > + case QEMU_AIO_WRITE: > + ret = handle_aiocb_rw(aiocb); > + break; > + case QEMU_AIO_FLUSH: > + ret = handle_aiocb_flush(aiocb); > + break; > + case QEMU_AIO_IOCTL: > + ret = handle_aiocb_ioctl(aiocb); > + break; > + default: > + fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type); > + ret = -EINVAL; > + break; > } > > - idle_threads--; > - cur_threads--; > - mutex_unlock(&lock); > - > - return NULL; > -} > - > -static void spawn_thread(void) > -{ > - sigset_t set, oldset; > - > - cur_threads++; > - idle_threads++; > - > - /* block all signals */ > - if (sigfillset(&set)) die("sigfillset"); > - if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask"); > - > - thread_create(&thread_id, &attr, aio_thread, NULL); > + qemu_mutex_lock(&aiocb_mutex); > + aiocb->ret = ret; > + qemu_cond_broadcast(&aiocb_completion); > + qemu_mutex_unlock(&aiocb_mutex); > > - if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore"); > + if (kill(pid, aiocb->ev_signo)) { > + die("kill failed"); > + } > } > > static void qemu_paio_submit(struct qemu_paiocb *aiocb) > { > + qemu_mutex_lock(&aiocb_mutex); > aiocb->ret = -EINPROGRESS; > - aiocb->active = 0; > - mutex_lock(&lock); > - if (idle_threads == 0 && cur_threads < max_threads) > - spawn_thread(); > - QTAILQ_INSERT_TAIL(&request_list, aiocb, node); > - mutex_unlock(&lock); > - cond_signal(&cond); > + qemu_mutex_unlock(&aiocb_mutex); > + > + aiocb->work.func = aio_thread; > + submit_work(&aiocb->work); > } > > static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb) > >