* Arun R B <a...@linux.vnet.ibm.com> [2010-10-19 23:12:45]: > From: Aneesh Kumar K.V <aneesh.ku...@linux.vnet.ibm.com> > > This patch creates a generic asynchronous-task-offloading infrastructure named > threadlets. The core idea has been borrowed from the threading framework that > is being used by paio. > > The reason for creating this generic infrastructure is so that other > subsystems, > such as virtio-9p could make use of it for offloading tasks that could block. > > The patch creates a global queue on-to which subsystems can queue their tasks > to > be executed asynchronously. > > The patch also provides API's that allow a subsystem to create a private queue > with an associated pool of threads. > > [...@in.ibm.com: Facelift of the code, Documentation, cancel_threadlet > and other helpers] > > Signed-off-by: Aneesh Kumar K.V <aneesh.ku...@linux.vnet.ibm.com> > Signed-off-by: Gautham R Shenoy <e...@in.ibm.com> > Signed-off-by: Sripathi Kodi <sripat...@in.ibm.com> > Signed-off-by: Arun R Bharadwaj <a...@linux.vnet.ibm.com> > --- > Makefile.objs | 3 + > docs/async-support.txt | 141 +++++++++++++++++++++++++++++++++++++++++ > qemu-threadlets.c | 165 > ++++++++++++++++++++++++++++++++++++++++++++++++ > qemu-threadlets.h | 48 ++++++++++++++ > 4 files changed, 356 insertions(+), 1 deletions(-) > create mode 100644 docs/async-support.txt > create mode 100644 qemu-threadlets.c > create mode 100644 qemu-threadlets.h > > diff --git a/Makefile.objs b/Makefile.objs > index cd5a24b..2cf8aba 100644 > --- a/Makefile.objs > +++ b/Makefile.objs > @@ -9,6 +9,8 @@ qobject-obj-y += qerror.o > > block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o > block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o > +block-obj-$(CONFIG_POSIX) += qemu-thread.o > +block-obj-$(CONFIG_POSIX) += qemu-threadlets.o > block-obj-$(CONFIG_POSIX) += posix-aio-compat.o > block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o > > @@ -124,7 +126,6 @@ endif > common-obj-y += $(addprefix ui/, $(ui-obj-y)) > > common-obj-y += iov.o acl.o > -common-obj-$(CONFIG_THREAD) += qemu-thread.o > common-obj-y += notify.o event_notifier.o > common-obj-y += qemu-timer.o > > diff --git a/docs/async-support.txt b/docs/async-support.txt > new file mode 100644 > index 0000000..9f22b9a > --- /dev/null > +++ b/docs/async-support.txt > @@ -0,0 +1,141 @@ > +== How to use the threadlets infrastructure supported in Qemu == > + > +== Threadlets == > + > +Q.1: What are threadlets ? > +A.1: Threadlets is an infrastructure within QEMU that allows other subsystems > + to offload possibly blocking work to a queue to be processed by a pool > + of threads asynchronously. > + > +Q.2: When would one want to use threadlets ? > +A.2: Threadlets are useful when there are operations that can be performed > + outside the context of the VCPU/IO threads inorder to free these latter > + to service any other guest requests. > + > +Q.3: I have some work that can be executed in an asynchronous context. How > + should I go about it ? > +A.3: One could follow the steps listed below: > + > + - Define a function which would do the asynchronous work. > + static void my_threadlet_func(ThreadletWork *work) > + { > + } > + > + - Declare an object of type ThreadletWork; > + ThreadletWork work; > + > + > + - Assign a value to the "func" member of ThreadletWork object. > + work.func = my_threadlet_func; > + > + - Submit the threadlet to the global queue. > + submit_threadletwork(&work); > + > + - Continue servicing some other guest operations. > + > +Q.4: I want to my_threadlet_func to access some non-global data. How do I do > + that ? > +A.4: Suppose you want my_threadlet_func to access some non-global data-object > + of type myPrivateData. In that case one could follow the following > steps. > + > + - Define a member of the type ThreadletWork within myPrivateData. > + typedef struct MyPrivateData { > + ...; > + ...; > + ...; > + ThreadletWork work; > + } MyPrivateData; > + > + MyPrivateData my_data; > + > + - Initialize myData.work as described in A.3 > + myData.work.func = my_threadlet_func; > + submit_threadletwork(&myData.work); > + > + - Access the myData object inside my_threadlet_func() using container_of > + primitive > + static void my_threadlet_func(ThreadletWork *work) > + { > + myPrivateData *mydata_ptr; > + mydata_ptr = container_of(work, myPrivateData, work); > + > + /* mydata_ptr now points to myData object */ > + } > + > +Q.5: Are there any precautions one must take while sharing data with the > + Asynchronous thread-pool ? > +A.5: Yes, make sure that the helper function of the type my_threadlet_func() > + does not access/modify data when it can be accessed or modified in the > + context of VCPU thread or IO thread. This is because the asynchronous > + threads in the pool can run in parallel with the VCPU/IOThreads as shown > + in the figure. > + > + A typical workflow is as follows: > + > + VCPU/IOThread > + | > + | (1) > + | > + V > + Offload work (2) > + |-------> to threadlets -----------------------------> Helper thread > + | | | > + | | | > + | | (3) | (4) > + | | | > + | Handle other Guest requests | > + | | | > + | | V > + | | (3) Signal the I/O > Thread > + |(6) | | > + | | / > + | | / > + | V / > + | Do the post <---------------------------------/ > + | processing (5) > + | | > + | | (6) > + | V > + |-Yes------ More async work? > + | > + | (7) > + No > + | > + | > + . > + . > + > + Hence one needs to make sure that in the steps (3) and (4) which run in > + parallel, any global data is accessed within only one context. > + > +Q.6: I have queued a threadlet which I want to cancel. How do I do that ? > +A.6: Threadlets framework provides the API cancel_threadlet: > + - int cancel_threadletwork(ThreadletWork *work) > + > + The API scans the ThreadletQueue to see if (work) is present. If it > finds > + work, it'll dequeue work and return 0. > + > + On the other hand, if it does not find the (work) in the ThreadletQueue, > + then it'll return 1. This can imply two things. Either the work is being > + processed by one of the helper threads or it has been processed. The > + threadlet infrastructure currently _does_not_ distinguish between these > + two and the onus is on the caller to do that. > + > +Q.7: Apart from the global pool of threads, can I have my own private Queue ? > +A.7: Yes, the threadlets framework allows subsystems to create their own > private > + queues with associated pools of threads. > + > + - Define a PrivateQueue > + ThreadletQueue myQueue; > + > + - Initialize it: > + threadlet_queue_init(&myQueue, my_max_threads, my_min_threads); > + where my_max_threads is the maximum number of threads that can be in > the > + thread pool and my_min_threads is the minimum number of active threads > + that can be in the thread-pool. > + > + - Submit work: > + submit_threadletwork_to_queue(&myQueue, &my_work); > + > + - Cancel work: > + cancel_threadletwork_on_queue(&myQueue, &my_work); > diff --git a/qemu-threadlets.c b/qemu-threadlets.c > new file mode 100644 > index 0000000..fd33752 > --- /dev/null > +++ b/qemu-threadlets.c > @@ -0,0 +1,165 @@ > +/* > + * Threadlet support for offloading tasks to be executed asynchronously > + * > + * Copyright IBM, Corp. 2008 > + * Copyright IBM, Corp. 2010 > + * > + * Authors: > + * Anthony Liguori <aligu...@us.ibm.com> > + * Aneesh Kumar K.V <aneesh.ku...@linux.vnet.ibm.com> > + * Gautham R Shenoy <e...@in.ibm.com> > + * > + * This work is licensed under the terms of the GNU GPL, version 2. See > + * the COPYING file in the top-level directory. > + */ > + > +#include "qemu-threadlets.h" > +#include "osdep.h" > + > +#define MAX_GLOBAL_THREADS 64 > +#define MIN_GLOBAL_THREADS 64 > +static ThreadletQueue globalqueue; > +static int globalqueue_init; > + > +static void *threadlet_worker(void *data) > +{ > + ThreadletQueue *queue = data; > + Ideally you need
s = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); But qemu will need to wrap this around as well. > + qemu_mutex_lock(&(queue->lock)); > + while (1) { > + ThreadletWork *work; > + int ret = 0; > + > + while (QTAILQ_EMPTY(&(queue->request_list)) && > + (ret != ETIMEDOUT)) { > + ret = qemu_cond_timedwait(&(queue->cond), > + &(queue->lock), 10*100000); Ewww... what is 10*100000, can we use something more meaningful please? > + } > + > + assert(queue->idle_threads != 0); This assertion holds because we believe one of the idle_threads actually did the dequeuing, right? > + if (QTAILQ_EMPTY(&(queue->request_list))) { > + if (queue->cur_threads > queue->min_threads) { > + /* We retain the minimum number of threads */ > + break; > + } > + } else { > + work = QTAILQ_FIRST(&(queue->request_list)); > + QTAILQ_REMOVE(&(queue->request_list), work, node); > + > + queue->idle_threads--; > + qemu_mutex_unlock(&(queue->lock)); > + > + /* execute the work function */ > + work->func(work); > + > + qemu_mutex_lock(&(queue->lock)); > + queue->idle_threads++; > + } > + } > + > + queue->idle_threads--; > + queue->cur_threads--; > + qemu_mutex_unlock(&(queue->lock)); > + > + return NULL; Does anybody do a join on the exiting thread from the pool? > +} > + > +static void spawn_threadlet(ThreadletQueue *queue) > +{ > + QemuThread thread; > + > + queue->cur_threads++; > + queue->idle_threads++; > + > + qemu_thread_create(&thread, threadlet_worker, queue); > +} > + > +/** > + * submit_threadletwork_to_queue: Submit a new task to a private queue to be > + * executed asynchronously. > + * @queue: Per-subsystem private queue to which the new task needs > + * to be submitted. > + * @work: Contains information about the task that needs to be submitted. > + */ > +void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork > *work) > +{ > + qemu_mutex_lock(&(queue->lock)); > + if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) > { > + spawn_threadlet(queue); So we hold queue->lock, spawn the thread, the spawned thread tries to acquire queue->lock > + } > + QTAILQ_INSERT_TAIL(&(queue->request_list), work, node); > + qemu_mutex_unlock(&(queue->lock)); > + qemu_cond_signal(&(queue->cond)); In the case that we just spawned the threadlet, the cond_signal is spurious. If we need predictable scheduling behaviour, qemu_cond_signal needs to happen with queue->lock held. I'd rewrite the function as /** * submit_threadletwork_to_queue: Submit a new task to a private queue to be * executed asynchronously. * @queue: Per-subsystem private queue to which the new task needs * to be submitted. * @work: Contains information about the task that needs to be submitted. */ void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work) { qemu_mutex_lock(&(queue->lock)); if (queue->idle_threads == 0 && (queue->cur_threads < queue->max_threads)) { spawn_threadlet(queue); } else { qemu_cond_signal(&(queue->cond)); } QTAILQ_INSERT_TAIL(&(queue->request_list), work, node); qemu_mutex_unlock(&(queue->lock)); } > +/** > + * submit_threadletwork: Submit to the global queue a new task to be executed > + * asynchronously. > + * @work: Contains information about the task that needs to be submitted. > + */ > +void submit_threadletwork(ThreadletWork *work) > +{ > + if (unlikely(!globalqueue_init)) { > + threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS, > + MIN_GLOBAL_THREADS); > + globalqueue_init = 1; > + } What protects globalqueue_init? > + > + submit_threadletwork_to_queue(&globalqueue, work); > +} > + > +/** > + * cancel_threadletwork_on_queue: Cancel a task queued on a Queue. > + * @queue: The queue containing the task to be cancelled. > + * @work: Contains the information of the task that needs to be cancelled. > + * > + * Returns: 0 if the task is successfully cancelled. > + * 1 otherwise. > + */ > +int cancel_threadletwork_on_queue(ThreadletQueue *queue, ThreadletWork *work) > +{ > + ThreadletWork *ret_work; > + int ret = 0; > + > + qemu_mutex_lock(&(queue->lock)); > + QTAILQ_FOREACH(ret_work, &(queue->request_list), node) { > + if (ret_work == work) { > + QTAILQ_REMOVE(&(queue->request_list), ret_work, node); > + ret = 1; > + break; > + } > + } > + qemu_mutex_unlock(&(queue->lock)); > + > + return ret; > +} > + > +/** > + * cancel_threadletwork: Cancel a task queued on the global queue. NOTE: cancel is a confusing term, thread cancel is different from cancelling a job on the global queue, I'd preferrably call this dequeue_threadletwork Generic question, is thread a reason to use threadletwork as one word, instead of threadlet_work? Specially since the data structure is called ThreadletWork. > + * @work: Contains the information of the task that needs to be cancelled. > + * > + * Returns: 0 if the task is successfully cancelled. > + * 1 otherwise. > + */ > +int cancel_threadletwork(ThreadletWork *work) > +{ > + return cancel_threadletwork_on_queue(&globalqueue, work); > +} > + > +/** > + * threadlet_queue_init: Initialize a threadlet queue. > + * @queue: The threadlet queue to be initialized. > + * @max_threads: Maximum number of threads processing the queue. > + * @min_threads: Minimum number of threads processing the queue. > + */ > +void threadlet_queue_init(ThreadletQueue *queue, > + int max_threads, int min_threads) > +{ > + queue->cur_threads = 0; > + queue->idle_threads = 0; > + queue->max_threads = max_threads; > + queue->min_threads = min_threads; > + QTAILQ_INIT(&(queue->request_list)); > + qemu_mutex_init(&(queue->lock)); > + qemu_cond_init(&(queue->cond)); > +} > diff --git a/qemu-threadlets.h b/qemu-threadlets.h > new file mode 100644 > index 0000000..9c8f9e5 > --- /dev/null > +++ b/qemu-threadlets.h > @@ -0,0 +1,48 @@ > +/* > + * Threadlet support for offloading tasks to be executed asynchronously > + * > + * Copyright IBM, Corp. 2008 > + * Copyright IBM, Corp. 2010 > + * > + * Authors: > + * Anthony Liguori <aligu...@us.ibm.com> > + * Aneesh Kumar K.V <aneesh.ku...@linux.vnet.ibm.com> > + * Gautham R Shenoy <e...@in.ibm.com> > + * > + * This work is licensed under the terms of the GNU GPL, version 2. See > + * the COPYING file in the top-level directory. > + */ > + > +#ifndef QEMU_ASYNC_WORK_H > +#define QEMU_ASYNC_WORK_H > + > +#include "qemu-queue.h" > +#include "qemu-common.h" > +#include "qemu-thread.h" > + > +typedef struct ThreadletQueue > +{ > + QemuMutex lock; > + QemuCond cond; > + int max_threads; > + int min_threads; > + int cur_threads; > + int idle_threads; > + QTAILQ_HEAD(, ThreadletWork) request_list; > +} ThreadletQueue; > + > +typedef struct ThreadletWork > +{ > + QTAILQ_ENTRY(ThreadletWork) node; > + void (*func)(struct ThreadletWork *work); > +} ThreadletWork; > + > +extern void submit_threadletwork_to_queue(ThreadletQueue *queue, > + ThreadletWork *work); > +extern void submit_threadletwork(ThreadletWork *work); > +extern int cancel_threadletwork_on_queue(ThreadletQueue *queue, > + ThreadletWork *work); > +extern int cancel_threadletwork(ThreadletWork *work); > +extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads, > + int min_threads); > +#endif > > -- Three Cheers, Balbir