On Tue, Nov 9, 2010 at 12:06 PM, Arun R Bharadwaj <a...@linux.vnet.ibm.com> wrote: > * Stefan Hajnoczi <stefa...@gmail.com> [2010-11-08 21:29:12]: > >> On Mon, Nov 8, 2010 at 2:33 PM, Arun R Bharadwaj >> <a...@linux.vnet.ibm.com> wrote: >> > diff --git a/Makefile.objs b/Makefile.objs >> > index cd5a24b..3b7ec27 100644 >> > --- a/Makefile.objs >> > +++ b/Makefile.objs >> > @@ -9,6 +9,7 @@ qobject-obj-y += qerror.o >> > >> > block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o >> > block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o >> > +block-obj-$(CONFIG_POSIX) += qemu-thread.o >> > block-obj-$(CONFIG_POSIX) += posix-aio-compat.o >> > block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o >> > >> > @@ -124,7 +125,6 @@ endif >> > common-obj-y += $(addprefix ui/, $(ui-obj-y)) >> > >> > common-obj-y += iov.o acl.o >> > -common-obj-$(CONFIG_THREAD) += qemu-thread.o >> > common-obj-y += notify.o event_notifier.o >> > common-obj-y += qemu-timer.o >> > > Hi Stefan, > > Thanks for the quick review. > >> This change makes CONFIG_THREAD unused. The ./configure code that >> sets CONFIG_THREAD=y should be removed. >> > > I'll remove this. > >> > diff --git a/posix-aio-compat.c b/posix-aio-compat.c >> > index 7b862b5..00b2a4e 100644 >> > --- a/posix-aio-compat.c >> > +++ b/posix-aio-compat.c >> > @@ -29,7 +29,32 @@ >> > #include "block_int.h" >> > >> > #include "block/raw-posix-aio.h" >> > +#include "qemu-thread.h" >> > >> > +#define MAX_GLOBAL_THREADS 64 >> > +#define MIN_GLOBAL_THREADS 8 >> > + >> > +QemuMutex aiocb_mutex; >> >> This variable should be static since it isn't used externally. >> >> > + >> > +static void aio_thread(ThreadletWork *work) >> > { >> > pid_t pid; >> > + struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, >> > work); >> > + ssize_t ret = 0; >> > >> > pid = getpid(); >> > + aiocb->active = 1; >> >> aiocb->active needs to be assigned with aiocb_mutex held and then released in >> order for this memory write to be visible to other threads after this >> line of code. >> > > Yes. That makes sense. We definitely need to hold the mutex here. > >> > >> > static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb) >> > { >> > ssize_t ret; >> > >> > - mutex_lock(&lock); >> > + qemu_mutex_lock(&aiocb_mutex); >> > ret = aiocb->ret; >> > - mutex_unlock(&lock); >> > - >> > + qemu_mutex_unlock(&aiocb_mutex); >> > return ret; >> > } >> > >> > @@ -536,20 +619,20 @@ static void paio_cancel(BlockDriverAIOCB *blockacb) >> > struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb; >> > int active = 0; >> > >> > - mutex_lock(&lock); >> > if (!acb->active) { >> > - QTAILQ_REMOVE(&request_list, acb, node); >> > - acb->ret = -ECANCELED; >> > + if (!dequeue_work(&acb->work)) { >> > + acb->ret = -ECANCELED; >> > + } else { >> > + active = 1; >> > + } >> > } else if (acb->ret == -EINPROGRESS) { >> > active = 1; >> > } >> > - mutex_unlock(&lock); >> > >> > if (active) { >> > /* fail safe: if the aio could not be canceled, we wait for >> > it */ >> > - while (qemu_paio_error(acb) == EINPROGRESS) >> > - ; >> > + active = qemu_paio_error(acb); >> > } >> > >> > paio_remove(acb); >> >> acb->ret is not being consistently accessed with aiocb_mutex held. >> >> We don't wait for the work item to complete if it is active. This changes >> the >> semantics of paio_cancel() and will break callers who expect the request to >> be >> cancelled/completed when paio_cancel() returns. Also, we go ahead and free >> the >> acb for a running request which is dangerous because it may be reused and >> corrupted. >> >> I think both the active variable and field in qemu_paiocb are unnecessary >> because dequeue_work() already deals with inactive work items. If >> dequeue_work() was unsuccessful you need to wait until ret != -EINPROGRESS. >> > > So this would mean that we can use the earlier infinite while loop > right? > > while (qemu_paio_error(acb) == EINPROGRESS) > ; > > We can just take this outside the if (active) condition check.
I would go for something like this as paio_cancel(): if (dequeue_work(&acb->work) != 0) { /* Wait for running work item to complete */ mutex_lock(&aiocb_mutex); while (acb->ret == -EINPROGRESS) { cond_wait(&aiocb_completion, &aiocb_mutex); } mutex_unlock(&aiocb_mutex); } paio_remove(acb); If the work item has not yet been run, then dequeue_work() returns 0 and we can just remove the acb. If the work item is just running we sleep on the completion condition variable until there is a return value. If the work item completes just as we're doing the check, then the aiocb_mutex will synchronize things and we will not go to sleep without detecting that the request has completed. This approach requires introducing a condition variable. We only need a global aiocb_completion condition variable since there can only be one cancellation happening at a time. Stefan