On Fri, May 20, 2011 at 1:59 PM, Stefan Hajnoczi <stefa...@linux.vnet.ibm.com> wrote: > From: Kevin Wolf <kw...@redhat.com> > > Asynchronous code is becoming very complex. At the same time > synchronous code is growing because it is convenient to write. > Sometimes duplicate code paths are even added, one synchronous and the > other asynchronous. This patch introduces coroutines which allow code > that looks synchronous but is asynchronous under the covers. > > A coroutine has its own stack and is therefore able to preserve state > across blocking operations, which traditionally require callback > functions and manual marshalling of parameters. > > Creating and starting a coroutine is easy: > > coroutine = qemu_coroutine_create(my_coroutine); > qemu_coroutine_enter(coroutine, my_data); > > The coroutine then executes until it returns or yields: > > void coroutine_fn my_coroutine(void *opaque) { > MyData *my_data = opaque; > > /* do some work */ > > qemu_coroutine_yield(); > > /* do some more work */ > } > > Yielding switches control back to the caller of qemu_coroutine_enter(). > This is typically used to switch back to the main thread's event loop > after issuing an asynchronous I/O request. The request callback will > then invoke qemu_coroutine_enter() once more to switch back to the > coroutine. > > Note that if coroutines are used only from threads which hold the global > mutex they will never execute concurrently. This makes programming with > coroutines easier than with threads. Race conditions cannot occur since > only one coroutine may be active at any time. Other coroutines can only > run across yield. > > This coroutines implementation is based on the gtk-vnc implementation > written by Anthony Liguori <anth...@codemonkey.ws> but it has been > significantly rewritten by Kevin Wolf <kw...@redhat.com> to use > setjmp()/longjmp() instead of the more expensive swapcontext() and by > Paolo Bonzini <pbonz...@redhat.com> for Windows Fibers support. > > Signed-off-by: Kevin Wolf <kw...@redhat.com> > Signed-off-by: Stefan Hajnoczi <stefa...@linux.vnet.ibm.com> > --- > Makefile.objs | 7 ++ > coroutine-ucontext.c | 229 > ++++++++++++++++++++++++++++++++++++++++++++++++++ > coroutine-win32.c | 92 ++++++++++++++++++++ > qemu-coroutine-int.h | 48 +++++++++++ > qemu-coroutine.c | 75 ++++++++++++++++ > qemu-coroutine.h | 95 +++++++++++++++++++++ > trace-events | 5 + > 7 files changed, 551 insertions(+), 0 deletions(-) > create mode 100644 coroutine-ucontext.c > create mode 100644 coroutine-win32.c > create mode 100644 qemu-coroutine-int.h > create mode 100644 qemu-coroutine.c > create mode 100644 qemu-coroutine.h > > diff --git a/Makefile.objs b/Makefile.objs > index 4478c61..a8dbd15 100644 > --- a/Makefile.objs > +++ b/Makefile.objs > @@ -11,6 +11,12 @@ oslib-obj-$(CONFIG_WIN32) += oslib-win32.o > qemu-thread-win32.o > oslib-obj-$(CONFIG_POSIX) += oslib-posix.o qemu-thread-posix.o > > ####################################################################### > +# coroutines > +coroutine-obj-y = qemu-coroutine.o > +coroutine-obj-$(CONFIG_POSIX) += coroutine-ucontext.o > +coroutine-obj-$(CONFIG_WIN32) += coroutine-win32.o > + > +####################################################################### > # block-obj-y is code used by both qemu system emulation and qemu-img > > block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o > async.o > @@ -67,6 +73,7 @@ common-obj-y += readline.o console.o cursor.o qemu-error.o > common-obj-y += $(oslib-obj-y) > common-obj-$(CONFIG_WIN32) += os-win32.o > common-obj-$(CONFIG_POSIX) += os-posix.o > +common-obj-y += $(coroutine-obj-y) > > common-obj-y += tcg-runtime.o host-utils.o > common-obj-y += irq.o ioport.o input.o > diff --git a/coroutine-ucontext.c b/coroutine-ucontext.c > new file mode 100644 > index 0000000..bcea2bd > --- /dev/null > +++ b/coroutine-ucontext.c > @@ -0,0 +1,229 @@ > +/* > + * ucontext coroutine initialization code > + * > + * Copyright (C) 2006 Anthony Liguori <anth...@codemonkey.ws> > + * Copyright (C) 2011 Kevin Wolf <kw...@redhat.com> > + * > + * This library is free software; you can redistribute it and/or > + * modify it under the terms of the GNU Lesser General Public > + * License as published by the Free Software Foundation; either > + * version 2.0 of the License, or (at your option) any later version. > + * > + * This library is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + * Lesser General Public License for more details. > + * > + * You should have received a copy of the GNU Lesser General Public > + * License along with this library; if not, see > <http://www.gnu.org/licenses/>. > + */ > + > +/* XXX Is there a nicer way to disable glibc's stack check for longjmp? */ > +#ifdef _FORTIFY_SOURCE > +#undef _FORTIFY_SOURCE > +#endif > +#include <setjmp.h> > +#include <stdint.h> > +#include <pthread.h> > +#include <ucontext.h>
This would break OpenBSD build: CC coroutine-ucontext.o /src/qemu/coroutine-ucontext.c:28:22: warning: ucontext.h: No such file r directory /src/qemu/coroutine-ucontext.c: In function 'coroutine_new': /src/qemu/coroutine-ucontext.c:144: warning: implicit declaration of fun tion 'getcontext' /src/qemu/coroutine-ucontext.c:144: warning: nested extern declaration o 'getcontext' /src/qemu/coroutine-ucontext.c:152: error: 'ucontext_t' has no member na ed 'uc_link' /src/qemu/coroutine-ucontext.c:153: error: 'ucontext_t' has no member na ed 'uc_stack' /src/qemu/coroutine-ucontext.c:154: error: 'ucontext_t' has no member na ed 'uc_stack' /src/qemu/coroutine-ucontext.c:155: error: 'ucontext_t' has no member na ed 'uc_stack' /src/qemu/coroutine-ucontext.c:159: warning: implicit declaration of fun tion 'makecontext' /src/qemu/coroutine-ucontext.c:159: warning: nested extern declaration o 'makecontext' /src/qemu/coroutine-ucontext.c:164: warning: implicit declaration of fun tion 'swapcontext' /src/qemu/coroutine-ucontext.c:164: warning: nested extern declaration o 'swapcontext' Unfortunately these functions are not available on OpenBSD. I don't know which replacements can be used. What is gtk-vnc using on OpenBSD? > +#include "qemu-common.h" > +#include "qemu-coroutine-int.h" > + > +enum { > + /* Maximum free pool size prevents holding too many freed coroutines */ > + POOL_MAX_SIZE = 64, > +}; > + > +typedef struct { > + Coroutine base; > + void *stack; > + jmp_buf env; > +} CoroutineUContext; > + > +/** > + * Per-thread coroutine bookkeeping > + */ > +typedef struct { > + /** Currently executing coroutine */ > + Coroutine *current; > + > + /** Free list to speed up creation */ > + QLIST_HEAD(, Coroutine) pool; > + unsigned int pool_size; > + > + /** The default coroutine */ > + CoroutineUContext leader; > +} CoroutineThreadState; > + > +static pthread_key_t thread_state_key; > + > +/* > + * va_args to makecontext() must be type 'int', so passing > + * the pointer we need may require several int args. This > + * union is a quick hack to let us do that > + */ > +union cc_arg { > + void *p; > + int i[2]; > +}; > + > +static CoroutineThreadState *coroutine_get_thread_state(void) > +{ > + CoroutineThreadState *s = pthread_getspecific(thread_state_key); > + > + if (!s) { > + s = qemu_mallocz(sizeof(*s)); > + s->current = &s->leader.base; > + QLIST_INIT(&s->pool); > + pthread_setspecific(thread_state_key, s); > + } > + return s; > +} > + > +static void qemu_coroutine_thread_cleanup(void *opaque) > +{ > + CoroutineThreadState *s = opaque; > + Coroutine *co; > + Coroutine *tmp; > + > + QLIST_FOREACH_SAFE(co, &s->pool, pool_next, tmp) { > + qemu_free(DO_UPCAST(CoroutineUContext, base, co)->stack); > + qemu_free(co); > + } > + qemu_free(s); > +} > + > +static void __attribute__((constructor)) coroutine_init(void) > +{ > + int ret; > + > + ret = pthread_key_create(&thread_state_key, > qemu_coroutine_thread_cleanup); > + if (ret != 0) { > + fprintf(stderr, "unable to create leader key: %s\n", > strerror(errno)); > + abort(); > + } > +} > + > +static void coroutine_trampoline(int i0, int i1) > +{ > + union cc_arg arg; > + CoroutineUContext *self; > + Coroutine *co; > + > + arg.i[0] = i0; > + arg.i[1] = i1; > + self = arg.p; > + co = &self->base; > + > + /* Initialize longjmp environment and switch back the caller */ > + if (!setjmp(self->env)) { > + longjmp(*(jmp_buf *)co->entry_arg, 1); > + } > + > + while (true) { > + co->entry(co->entry_arg); > + qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE); > + } > +} > + > +static Coroutine *coroutine_new(void) > +{ > + const size_t stack_size = 4 << 20; > + CoroutineUContext *co; > + ucontext_t old_uc, uc; > + jmp_buf old_env; > + union cc_arg arg; > + > + /* The ucontext functions preserve signal masks which incurs a system > call > + * overhead. setjmp()/longjmp() does not preserve signal masks but only > + * works on the current stack. Since we need a way to create and switch > to > + * a new stack, use the ucontext functions for that but > setjmp()/longjmp() > + * for everything else. > + */ > + > + if (getcontext(&uc) == -1) { > + return NULL; > + } > + > + co = qemu_mallocz(sizeof(*co)); > + co->stack = qemu_malloc(stack_size); > + co->base.entry_arg = &old_env; /* stash away our jmp_buf */ > + > + uc.uc_link = &old_uc; > + uc.uc_stack.ss_sp = co->stack; > + uc.uc_stack.ss_size = stack_size; > + uc.uc_stack.ss_flags = 0; > + > + arg.p = co; > + > + makecontext(&uc, (void (*)(void))coroutine_trampoline, > + 2, arg.i[0], arg.i[1]); > + > + /* swapcontext() in, longjmp() back out */ > + if (!setjmp(old_env)) { > + swapcontext(&old_uc, &uc); > + } > + return &co->base; > +} > + > +Coroutine *qemu_coroutine_new(void) > +{ > + CoroutineThreadState *s = coroutine_get_thread_state(); > + Coroutine *co; > + > + co = QLIST_FIRST(&s->pool); > + if (co) { > + QLIST_REMOVE(co, pool_next); > + s->pool_size--; > + } else { > + co = coroutine_new(); > + } > + return co; > +} > + > +void qemu_coroutine_delete(Coroutine *co_) > +{ > + CoroutineThreadState *s = coroutine_get_thread_state(); > + CoroutineUContext *co = DO_UPCAST(CoroutineUContext, base, co_); > + > + if (s->pool_size < POOL_MAX_SIZE) { > + QLIST_INSERT_HEAD(&s->pool, &co->base, pool_next); > + co->base.caller = NULL; > + s->pool_size++; > + return; > + } > + > + qemu_free(co->stack); > + qemu_free(co); > +} > + > +CoroutineAction qemu_coroutine_switch(Coroutine *from_, Coroutine *to_, > + CoroutineAction action) > +{ > + CoroutineUContext *from = DO_UPCAST(CoroutineUContext, base, from_); > + CoroutineUContext *to = DO_UPCAST(CoroutineUContext, base, to_); > + CoroutineThreadState *s = coroutine_get_thread_state(); > + int ret; > + > + s->current = to_; > + > + ret = setjmp(from->env); > + if (ret == 0) { > + longjmp(to->env, action); > + } > + return ret; > +} > + > +Coroutine *qemu_coroutine_self(void) > +{ > + CoroutineThreadState *s = coroutine_get_thread_state(); > + > + return s->current; > +} > + > +bool qemu_in_coroutine(void) > +{ > + CoroutineThreadState *s = pthread_getspecific(thread_state_key); > + > + return s && s->current->caller; > +} > diff --git a/coroutine-win32.c b/coroutine-win32.c > new file mode 100644 > index 0000000..2215ae5 > --- /dev/null > +++ b/coroutine-win32.c > @@ -0,0 +1,92 @@ > +/* > + * Win32 coroutine initialization code > + * > + * Copyright (c) 2011 Kevin Wolf <kw...@redhat.com> > + * > + * Permission is hereby granted, free of charge, to any person obtaining a > copy > + * of this software and associated documentation files (the "Software"), to > deal > + * in the Software without restriction, including without limitation the > rights > + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell > + * copies of the Software, and to permit persons to whom the Software is > + * furnished to do so, subject to the following conditions: > + * > + * The above copyright notice and this permission notice shall be included in > + * all copies or substantial portions of the Software. > + * > + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR > + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, > + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL > + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER > + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING > FROM, > + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN > + * THE SOFTWARE. > + */ > + > +#include "qemu-common.h" > +#include "qemu-coroutine-int.h" > + > +typedef struct > +{ ERROR: open brace '{' following struct go on the same line #397: FILE: coroutine-win32.c:29: +typedef struct +{ total: 1 errors, 0 warnings, 566 lines checked Your patch has style problems, please review. If any of these errors are false positives report them to the maintainer, see CHECKPATCH in MAINTAINERS. > + Coroutine base; > + > + LPVOID fiber; > + CoroutineAction action; > +} CoroutineWin32; > + > +static __thread CoroutineWin32 leader; > +static __thread Coroutine *current; > + > +CoroutineAction qemu_coroutine_switch(Coroutine *from_, Coroutine *to_, > + CoroutineAction action) > +{ > + CoroutineWin32 *from = DO_UPCAST(CoroutineWin32, base, from_); > + CoroutineWin32 *to = DO_UPCAST(CoroutineWin32, base, to_); > + > + current = to_; > + > + to->action = action; > + SwitchToFiber(to->fiber); > + return from->action; > +} > + > +static void CALLBACK coroutine_trampoline(void *co_) > +{ > + Coroutine *co = co_; > + > + while (true) { > + co->entry(co->entry_arg); > + qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE); > + } > +} > + > +Coroutine *qemu_coroutine_new(void) > +{ > + const size_t stack_size = 4 << 20; > + CoroutineWin32 *co; > + > + co = qemu_mallocz(sizeof(*co)); > + co->fiber = CreateFiber(stack_size, coroutine_trampoline, &co->base); > + return &co->base; > +} > + > +void qemu_coroutine_delete(Coroutine *co_) > +{ > + CoroutineWin32 *co = DO_UPCAST(CoroutineWin32, base, co_); > + > + DeleteFiber(co->fiber); > + qemu_free(co); > +} > + > +Coroutine *qemu_coroutine_self(void) > +{ > + if (!current) { > + current = &leader.base; > + leader.fiber = ConvertThreadToFiber(NULL); > + } > + return current; > +} > + > +bool qemu_in_coroutine(void) > +{ > + return current && current->caller; > +} > diff --git a/qemu-coroutine-int.h b/qemu-coroutine-int.h > new file mode 100644 > index 0000000..64915c2 > --- /dev/null > +++ b/qemu-coroutine-int.h > @@ -0,0 +1,48 @@ > +/* > + * Coroutine internals > + * > + * Copyright (c) 2011 Kevin Wolf <kw...@redhat.com> > + * > + * Permission is hereby granted, free of charge, to any person obtaining a > copy > + * of this software and associated documentation files (the "Software"), to > deal > + * in the Software without restriction, including without limitation the > rights > + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell > + * copies of the Software, and to permit persons to whom the Software is > + * furnished to do so, subject to the following conditions: > + * > + * The above copyright notice and this permission notice shall be included in > + * all copies or substantial portions of the Software. > + * > + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR > + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, > + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL > + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER > + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING > FROM, > + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN > + * THE SOFTWARE. > + */ > + > +#ifndef QEMU_COROUTINE_INT_H > +#define QEMU_COROUTINE_INT_H > + > +#include "qemu-queue.h" > +#include "qemu-coroutine.h" > + > +typedef enum { > + COROUTINE_YIELD = 1, > + COROUTINE_TERMINATE = 2, > +} CoroutineAction; > + > +struct Coroutine { > + CoroutineEntry *entry; > + void *entry_arg; > + Coroutine *caller; > + QLIST_ENTRY(Coroutine) pool_next; > +}; > + > +Coroutine *qemu_coroutine_new(void); > +void qemu_coroutine_delete(Coroutine *co); > +CoroutineAction qemu_coroutine_switch(Coroutine *from, Coroutine *to, > + CoroutineAction action); > + > +#endif > diff --git a/qemu-coroutine.c b/qemu-coroutine.c > new file mode 100644 > index 0000000..600be26 > --- /dev/null > +++ b/qemu-coroutine.c > @@ -0,0 +1,75 @@ > +/* > + * QEMU coroutines > + * > + * Copyright IBM, Corp. 2011 > + * > + * Authors: > + * Stefan Hajnoczi <stefa...@linux.vnet.ibm.com> > + * Kevin Wolf <kw...@redhat.com> > + * > + * This work is licensed under the terms of the GNU LGPL, version 2 or later. > + * See the COPYING.LIB file in the top-level directory. > + * > + */ > + > +#include "trace.h" > +#include "qemu-common.h" > +#include "qemu-coroutine.h" > +#include "qemu-coroutine-int.h" > + > +Coroutine *qemu_coroutine_create(CoroutineEntry *entry) > +{ > + Coroutine *co = qemu_coroutine_new(); > + co->entry = entry; > + return co; > +} > + > +static void coroutine_swap(Coroutine *from, Coroutine *to) > +{ > + CoroutineAction ret; > + > + ret = qemu_coroutine_switch(from, to, COROUTINE_YIELD); > + > + switch (ret) { > + case COROUTINE_YIELD: > + return; > + case COROUTINE_TERMINATE: > + trace_qemu_coroutine_terminate(to); > + qemu_coroutine_delete(to); > + return; > + default: > + abort(); > + } > +} > + > +void qemu_coroutine_enter(Coroutine *co, void *opaque) > +{ > + Coroutine *self = qemu_coroutine_self(); > + > + trace_qemu_coroutine_enter(self, co, opaque); > + > + if (co->caller) { > + fprintf(stderr, "Co-routine re-entered recursively\n"); > + abort(); > + } > + > + co->caller = self; > + co->entry_arg = opaque; > + coroutine_swap(self, co); > +} > + > +void coroutine_fn qemu_coroutine_yield(void) > +{ > + Coroutine *self = qemu_coroutine_self(); > + Coroutine *to = self->caller; > + > + trace_qemu_coroutine_yield(self, to); > + > + if (!to) { > + fprintf(stderr, "Co-routine is yielding to no one\n"); > + abort(); > + } > + > + self->caller = NULL; > + coroutine_swap(self, to); > +} > diff --git a/qemu-coroutine.h b/qemu-coroutine.h > new file mode 100644 > index 0000000..08255c7 > --- /dev/null > +++ b/qemu-coroutine.h > @@ -0,0 +1,95 @@ > +/* > + * QEMU coroutine implementation > + * > + * Copyright IBM, Corp. 2011 > + * > + * Authors: > + * Stefan Hajnoczi <stefa...@linux.vnet.ibm.com> > + * > + * This work is licensed under the terms of the GNU LGPL, version 2 or later. > + * See the COPYING.LIB file in the top-level directory. > + * > + */ > + > +#ifndef QEMU_COROUTINE_H > +#define QEMU_COROUTINE_H > + > +#include <stdbool.h> > + > +/** > + * Coroutines are a mechanism for stack switching and can be used for > + * cooperative userspace threading. These functions provide a simple but > + * useful flavor of coroutines that is suitable for writing sequential code, > + * rather than callbacks, for operations that need to give up control while > + * waiting for events to complete. > + * > + * These functions are re-entrant and may be used outside the global mutex. > + */ > + > +/** > + * Mark a function that executes in coroutine context > + * > + * Functions that execute in coroutine context cannot be called directly from > + * normal functions. In the future it would be nice to enable compiler or > + * static checker support for catching such errors. This annotation might > make > + * it possible and in the meantime it serves as documentation. > + * > + * For example: > + * > + * static void coroutine_fn foo(void) { > + * .... > + * } > + */ > +#define coroutine_fn > + > +typedef struct Coroutine Coroutine; > + > +/** > + * Coroutine entry point > + * > + * When the coroutine is entered for the first time, opaque is passed in as > an > + * argument. > + * > + * When this function returns, the coroutine is destroyed automatically and > + * execution continues in the caller who last entered the coroutine. > + */ > +typedef void coroutine_fn CoroutineEntry(void *opaque); > + > +/** > + * Create a new coroutine > + * > + * Use qemu_coroutine_enter() to actually transfer control to the coroutine. > + */ > +Coroutine *qemu_coroutine_create(CoroutineEntry *entry); > + > +/** > + * Transfer control to a coroutine > + * > + * The opaque argument is passed as the argument to the entry point when > + * entering the coroutine for the first time. It is subsequently ignored. > + */ > +void qemu_coroutine_enter(Coroutine *coroutine, void *opaque); > + > +/** > + * Transfer control back to a coroutine's caller > + * > + * This function does not return until the coroutine is re-entered using > + * qemu_coroutine_enter(). > + */ > +void coroutine_fn qemu_coroutine_yield(void); > + > +/** > + * Get the currently executing coroutine > + */ > +Coroutine *coroutine_fn qemu_coroutine_self(void); > + > +/** > + * Return whether or not currently inside a coroutine > + * > + * This can be used to write functions that work both when in coroutine > context > + * and when not in coroutine context. Note that such functions cannot use > the > + * coroutine_fn annotation since they work outside coroutine context. > + */ > +bool qemu_in_coroutine(void); > + > +#endif /* QEMU_COROUTINE_H */ > diff --git a/trace-events b/trace-events > index 385cb00..e21e67d 100644 > --- a/trace-events > +++ b/trace-events > @@ -377,3 +377,8 @@ disable xen_unmap_block(void* addr, unsigned long size) > "%p, size %#lx" > > # exec.c > disable qemu_put_ram_ptr(void* addr) "%p" > + > +# qemu-coroutine.c > +disable qemu_coroutine_enter(void *from, void *to, void *opaque) "from %p to > %p opaque %p" > +disable qemu_coroutine_yield(void *from, void *to) "from %p to %p" > +disable qemu_coroutine_terminate(void *co) "self %p" > -- > 1.7.4.4 > >