Signed-off-by: Arun R Bharadwaj<a...@linux.vnet.ibm.com>
Signed-off-by: Aneesh Kumar K.V<aneesh.ku...@linux.vnet.ibm.com>
Signed-off-by: Gautham R Shenoy<gautham.she...@gmail.com>
Signed-off-by: Sripathi Kodi<sripat...@in.ibm.com>
---
Makefile.objs | 2
configure | 2
posix-aio-compat.c | 354 +++++++++++++++++++++++++++++++---------------------
3 files changed, 211 insertions(+), 147 deletions(-)
diff --git a/Makefile.objs b/Makefile.objs
index cd5a24b..3b7ec27 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -9,6 +9,7 @@ qobject-obj-y += qerror.o
block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
+block-obj-$(CONFIG_POSIX) += qemu-thread.o
block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
@@ -124,7 +125,6 @@ endif
common-obj-y += $(addprefix ui/, $(ui-obj-y))
common-obj-y += iov.o acl.o
-common-obj-$(CONFIG_THREAD) += qemu-thread.o
common-obj-y += notify.o event_notifier.o
common-obj-y += qemu-timer.o
diff --git a/configure b/configure
index a079a49..addf733 100755
--- a/configure
+++ b/configure
@@ -2456,7 +2456,6 @@ if test "$vnc_png" != "no" ; then
fi
if test "$vnc_thread" != "no" ; then
echo "CONFIG_VNC_THREAD=y">> $config_host_mak
- echo "CONFIG_THREAD=y">> $config_host_mak
fi
if test "$fnmatch" = "yes" ; then
echo "CONFIG_FNMATCH=y">> $config_host_mak
@@ -2534,7 +2533,6 @@ if test "$xen" = "yes" ; then
fi
if test "$io_thread" = "yes" ; then
echo "CONFIG_IOTHREAD=y">> $config_host_mak
- echo "CONFIG_THREAD=y">> $config_host_mak
fi
if test "$linux_aio" = "yes" ; then
echo "CONFIG_LINUX_AIO=y">> $config_host_mak
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b862b5..e1812fc 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,7 +29,33 @@
#include "block_int.h"
#include "block/raw-posix-aio.h"
+#include "qemu-thread.h"
+#define MAX_GLOBAL_THREADS 64
+#define MIN_GLOBAL_THREADS 8
+
+static QemuMutex aiocb_mutex;
+static QemuCond aiocb_completion;
+
+typedef struct ThreadletQueue
+{
+ QemuMutex lock;
+ QemuCond cond;
+ int max_threads;
+ int min_threads;
+ int cur_threads;
+ int idle_threads;
+ QTAILQ_HEAD(, ThreadletWork) request_list;
+} ThreadletQueue;
+
+typedef struct ThreadletWork
+{
+ QTAILQ_ENTRY(ThreadletWork) node;
+ void (*func)(struct ThreadletWork *work);
+} ThreadletWork;
+
+static ThreadletQueue globalqueue;
+static int globalqueue_init;
struct qemu_paiocb {
BlockDriverAIOCB common;
@@ -44,13 +70,12 @@ struct qemu_paiocb {
int ev_signo;
off_t aio_offset;
- QTAILQ_ENTRY(qemu_paiocb) node;
int aio_type;
ssize_t ret;
- int active;
struct qemu_paiocb *next;
int async_context_id;
+ ThreadletWork work;
};
typedef struct PosixAioState {
@@ -58,64 +83,169 @@ typedef struct PosixAioState {
struct qemu_paiocb *first_aio;
} PosixAioState;
+static void *threadlet_worker(void *data)
+{
+ ThreadletQueue *queue = data;
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
+ qemu_mutex_lock(&queue->lock);
+ while (1) {
+ ThreadletWork *work;
+ int ret = 0;
+
+ while (QTAILQ_EMPTY(&queue->request_list)&&
+ (ret != ETIMEDOUT)) {
+ /* wait for cond to be signalled or broadcast for 1000s */
+ ret = qemu_cond_timedwait((&queue->cond),
+&(queue->lock), 10*100000);
+ }
-#ifdef CONFIG_PREADV
-static int preadv_present = 1;
-#else
-static int preadv_present = 0;
-#endif
+ assert(queue->idle_threads != 0);
+ if (QTAILQ_EMPTY(&queue->request_list)) {
+ if (queue->cur_threads> queue->min_threads) {
+ /* We retain the minimum number of threads */
+ break;
+ }
+ } else {
+ work = QTAILQ_FIRST(&queue->request_list);
+ QTAILQ_REMOVE(&queue->request_list, work, node);
-static void die2(int err, const char *what)
-{
- fprintf(stderr, "%s failed: %s\n", what, strerror(err));
- abort();
+ queue->idle_threads--;
+ qemu_mutex_unlock(&queue->lock);
+
+ /* execute the work function */
+ work->func(work);
+
+ qemu_mutex_lock(&queue->lock);
+ queue->idle_threads++;
+ }
+ }
+
+ queue->idle_threads--;
+ queue->cur_threads--;
+ qemu_mutex_unlock(&queue->lock);
+
+ return NULL;
}
-static void die(const char *what)
+static void spawn_threadlet(ThreadletQueue *queue)
{
- die2(errno, what);
+ QemuThread thread;
+
+ queue->cur_threads++;
+ queue->idle_threads++;
+
+ qemu_thread_create(&thread, threadlet_worker, queue);
}
-static void mutex_lock(pthread_mutex_t *mutex)
+/**
+ * submit_work_to_queue: Submit a new task to a private queue to be
+ * executed asynchronously.
+ * @queue: Per-subsystem private queue to which the new task needs
+ * to be submitted.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
{
- int ret = pthread_mutex_lock(mutex);
- if (ret) die2(ret, "pthread_mutex_lock");
+ qemu_mutex_lock(&queue->lock);
+ if (queue->idle_threads == 0&& queue->cur_threads< queue->max_threads) {
+ spawn_threadlet(queue);
+ } else {
+ qemu_cond_signal(&queue->cond);
+ }
+ QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
+ qemu_mutex_unlock(&queue->lock);
}
-static void mutex_unlock(pthread_mutex_t *mutex)
+static void threadlet_queue_init(ThreadletQueue *queue,
+ int max_threads, int min_threads);
+
+/**
+ * submit_work: Submit to the global queue a new task to be executed
+ * asynchronously.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+static void submit_work(ThreadletWork *work)
{
- int ret = pthread_mutex_unlock(mutex);
- if (ret) die2(ret, "pthread_mutex_unlock");
+ if (!globalqueue_init) {
+ threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
+ MIN_GLOBAL_THREADS);
+ globalqueue_init = 1;
+ }
+
+ submit_work_to_queue(&globalqueue, work);
}
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
- struct timespec *ts)
+/**
+ * dequeue_work_on_queue: Cancel a task queued on a Queue.
+ * @queue: The queue containing the task to be cancelled.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ * 1 otherwise.
+ */
+static int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
{
- int ret = pthread_cond_timedwait(cond, mutex, ts);
- if (ret&& ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
+ ThreadletWork *ret_work;
+ int ret = 1;
+
+ qemu_mutex_lock(&queue->lock);
+ QTAILQ_FOREACH(ret_work,&(queue->request_list), node) {
+ if (ret_work == work) {
+ QTAILQ_REMOVE(&queue->request_list, ret_work, node);
+ ret = 0;
+ break;
+ }
+ }
+ qemu_mutex_unlock(&queue->lock);
+
return ret;
}
-static void cond_signal(pthread_cond_t *cond)
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ * 1 otherwise.
+ */
+static int dequeue_work(ThreadletWork *work)
+{
+ return dequeue_work_on_queue(&globalqueue, work);
+}
+
+/**
+ * threadlet_queue_init: Initialize a threadlet queue.
+ * @queue: The threadlet queue to be initialized.
+ * @max_threads: Maximum number of threads processing the queue.
+ * @min_threads: Minimum number of threads processing the queue.
+ */
+static void threadlet_queue_init(ThreadletQueue *queue,
+ int max_threads, int min_threads)
+{
+ queue->cur_threads = 0;
+ queue->idle_threads = 0;
+ queue->max_threads = max_threads;
+ queue->min_threads = min_threads;
+ QTAILQ_INIT(&queue->request_list);
+ qemu_mutex_init(&queue->lock);
+ qemu_cond_init(&queue->cond);
+}
+
+#ifdef CONFIG_PREADV
+static int preadv_present = 1;
+#else
+static int preadv_present;
+#endif
+
+static void die2(int err, const char *what)
{
- int ret = pthread_cond_signal(cond);
- if (ret) die2(ret, "pthread_cond_signal");
+ fprintf(stderr, "%s failed: %s\n", what, strerror(err));
+ abort();
}
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
- void *(*start_routine)(void*), void *arg)
+static void die(const char *what)
{
- int ret = pthread_create(thread, attr, start_routine, arg);
- if (ret) die2(ret, "pthread_create");
+ die2(errno, what);
}
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
@@ -301,106 +431,58 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
return nbytes;
}
-static void *aio_thread(void *unused)
+static void aio_thread(ThreadletWork *work)
{
pid_t pid;
+ struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
+ ssize_t ret = 0;
pid = getpid();
- while (1) {
- struct qemu_paiocb *aiocb;
- ssize_t ret = 0;
- qemu_timeval tv;
- struct timespec ts;
-
- qemu_gettimeofday(&tv);
- ts.tv_sec = tv.tv_sec + 10;
- ts.tv_nsec = 0;
-
- mutex_lock(&lock);
-
- while (QTAILQ_EMPTY(&request_list)&&
- !(ret == ETIMEDOUT)) {
- ret = cond_timedwait(&cond,&lock,&ts);
- }
-
- if (QTAILQ_EMPTY(&request_list))
- break;
-
- aiocb = QTAILQ_FIRST(&request_list);
- QTAILQ_REMOVE(&request_list, aiocb, node);
- aiocb->active = 1;
- idle_threads--;
- mutex_unlock(&lock);
-
- switch (aiocb->aio_type& QEMU_AIO_TYPE_MASK) {
- case QEMU_AIO_READ:
- case QEMU_AIO_WRITE:
- ret = handle_aiocb_rw(aiocb);
- break;
- case QEMU_AIO_FLUSH:
- ret = handle_aiocb_flush(aiocb);
- break;
- case QEMU_AIO_IOCTL:
- ret = handle_aiocb_ioctl(aiocb);
- break;
- default:
- fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
- ret = -EINVAL;
- break;
- }
-
- mutex_lock(&lock);
- aiocb->ret = ret;
- idle_threads++;
- mutex_unlock(&lock);
-
- if (kill(pid, aiocb->ev_signo)) die("kill failed");
+ switch (aiocb->aio_type& QEMU_AIO_TYPE_MASK) {
+ case QEMU_AIO_READ:
+ case QEMU_AIO_WRITE:
+ ret = handle_aiocb_rw(aiocb);
+ break;
+ case QEMU_AIO_FLUSH:
+ ret = handle_aiocb_flush(aiocb);
+ break;
+ case QEMU_AIO_IOCTL:
+ ret = handle_aiocb_ioctl(aiocb);
+ break;
+ default:
+ fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+ ret = -EINVAL;
+ break;
}
- idle_threads--;
- cur_threads--;
- mutex_unlock(&lock);
-
- return NULL;
-}
-
-static void spawn_thread(void)
-{
- sigset_t set, oldset;
-
- cur_threads++;
- idle_threads++;
+ qemu_mutex_lock(&aiocb_mutex);
+ aiocb->ret = ret;
+ qemu_cond_broadcast(&aiocb_completion);
+ qemu_mutex_unlock(&aiocb_mutex);
- /* block all signals */
- if (sigfillset(&set)) die("sigfillset");
- if (sigprocmask(SIG_SETMASK,&set,&oldset)) die("sigprocmask");
-
- thread_create(&thread_id,&attr, aio_thread, NULL);
-
- if (sigprocmask(SIG_SETMASK,&oldset, NULL)) die("sigprocmask restore");
+ if (kill(pid, aiocb->ev_signo)) {
+ die("kill failed");
+ }
}
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
{
+ qemu_mutex_lock(&aiocb_mutex);
aiocb->ret = -EINPROGRESS;
- aiocb->active = 0;
- mutex_lock(&lock);
- if (idle_threads == 0&& cur_threads< max_threads)
- spawn_thread();
- QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
- mutex_unlock(&lock);
- cond_signal(&cond);
+ qemu_mutex_unlock(&aiocb_mutex);
+
+ aiocb->work.func = aio_thread;
+ submit_work(&aiocb->work);
}
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
{
ssize_t ret;
- mutex_lock(&lock);
+ qemu_mutex_lock(&aiocb_mutex);
ret = aiocb->ret;
- mutex_unlock(&lock);
-
+ qemu_mutex_unlock(&aiocb_mutex);
return ret;
}
@@ -534,22 +616,14 @@ static void paio_remove(struct qemu_paiocb *acb)
static void paio_cancel(BlockDriverAIOCB *blockacb)
{
struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
- int active = 0;
-
- mutex_lock(&lock);
- if (!acb->active) {
- QTAILQ_REMOVE(&request_list, acb, node);
- acb->ret = -ECANCELED;
- } else if (acb->ret == -EINPROGRESS) {
- active = 1;
- }
- mutex_unlock(&lock);
- if (active) {
- /* fail safe: if the aio could not be canceled, we wait for
- it */
- while (qemu_paio_error(acb) == EINPROGRESS)
- ;
+ if (dequeue_work(&acb->work) != 0) {
+ /* Wait for running work item to complete */
+ qemu_mutex_lock(&aiocb_mutex);
+ while (acb->ret == -EINPROGRESS) {
+ qemu_cond_wait(&aiocb_completion,&aiocb_mutex);
+ }
+ qemu_mutex_unlock(&aiocb_mutex);
}
paio_remove(acb);
@@ -618,11 +692,13 @@ int paio_init(void)
struct sigaction act;
PosixAioState *s;
int fds[2];
- int ret;
if (posix_aio_state)
return 0;
+ qemu_mutex_init(&aiocb_mutex);
+ qemu_cond_init(&aiocb_completion);
+
s = qemu_malloc(sizeof(PosixAioState));
sigfillset(&act.sa_mask);
@@ -645,16 +721,6 @@ int paio_init(void)
qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
posix_aio_process_queue, s);
- ret = pthread_attr_init(&attr);
- if (ret)
- die2(ret, "pthread_attr_init");
-
- ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if (ret)
- die2(ret, "pthread_attr_setdetachstate");
-
- QTAILQ_INIT(&request_list);
-
posix_aio_state = s;
return 0;
}