On 12/08/2015 15:38, Paolo Bonzini wrote: > QemuEvents are used heavily by call_rcu. We do not want them to be slow, > but the current implementation does a kernel call on every invocation > of qemu_event_* and won't cut it. > > So, wrap a Win32 manual-reset event with a fast userspace path. The > states and transitions are the same as for the futex and mutex/condvar > implementations, but the slow path is different of course. The idea > is to reset the Win32 event lazily, as part of a test-reset-test-wait > sequence. Such a sequence is, indeed, how QemuEvents are used by > RCU and other subsystems! > > The patch includes a formal model of the algorithm. > > Signed-off-by: Paolo Bonzini <pbonz...@redhat.com> > --- > docs/win32-qemu-event.promela | 98 > +++++++++++++++++++++++++++++++++++++++++++ > include/qemu/thread-win32.h | 1 + > util/qemu-thread-win32.c | 66 +++++++++++++++++++++++++++-- > 3 files changed, 161 insertions(+), 4 deletions(-) > create mode 100644 docs/win32-qemu-event.promela > > diff --git a/docs/win32-qemu-event.promela b/docs/win32-qemu-event.promela > new file mode 100644 > index 0000000..c446a71 > --- /dev/null > +++ b/docs/win32-qemu-event.promela > @@ -0,0 +1,98 @@ > +/* > + * This model describes the implementation of QemuEvent in > + * util/qemu-thread-win32.c. > + * > + * Author: Paolo Bonzini <pbonz...@redhat.com> > + * > + * This file is in the public domain. If you really want a license, > + * the WTFPL will do. > + * > + * To verify it: > + * spin -a docs/event.promela > + * gcc -O2 pan.c -DSAFETY > + * ./a.out > + */ > + > +bool event; > +int value; > + > +/* Primitives for a Win32 event */ > +#define RAW_RESET event = false > +#define RAW_SET event = true > +#define RAW_WAIT do :: event -> break; od > + > +#if 0 > +/* Basic sanity checking: test the Win32 event primitives */ > +#define RESET RAW_RESET > +#define SET RAW_SET > +#define WAIT RAW_WAIT > +#else > +/* Full model: layer a userspace-only fast path on top of the RAW_* > + * primitives. SET/RESET/WAIT have exactly the same semantics as > + * RAW_SET/RAW_RESET/RAW_WAIT, but try to avoid invoking them. > + */ > +#define EV_SET 0 > +#define EV_FREE 1 > +#define EV_BUSY -1 > + > +int state = EV_FREE; > + > +int xchg_result; > +#define SET if :: state != EV_SET -> \ > + atomic { /* xchg_result=xchg(state, EV_SET) */ \ > + xchg_result = state; \ > + state = EV_SET; \ > + } \ > + if :: xchg_result == EV_BUSY -> RAW_SET; \ > + :: else -> skip; \ > + fi; \ > + :: else -> skip; \ > + fi > + > +#define RESET if :: state == EV_SET -> atomic { state = state | EV_FREE; } \ > + :: else -> skip; \ > + fi > + > +int tmp1, tmp2; > +#define WAIT tmp1 = state; \ > + if :: tmp1 != EV_SET -> \ > + if :: tmp1 == EV_FREE -> \ > + RAW_RESET; \ > + atomic { /* tmp2=cas(state, EV_FREE, EV_BUSY) */ \ > + tmp2 = state; \ > + if :: tmp2 == EV_FREE -> state = EV_BUSY; \ > + :: else -> skip; \ > + fi; \ > + } \ > + if :: tmp2 == EV_SET -> tmp1 = EV_SET; \ > + :: else -> tmp1 = EV_BUSY; \ > + fi; \ > + :: else -> skip; \ > + fi; \ > + assert(tmp1 != EV_FREE); \ > + if :: tmp1 == EV_BUSY -> RAW_WAIT; \ > + :: else -> skip; \ > + fi; \ > + :: else -> skip; \ > + fi > +#endif > + > +active proctype waiter() > +{ > + if > + :: !value -> > + RESET; > + if > + :: !value -> WAIT; > + :: else -> skip; > + fi; > + :: else -> skip; > + fi; > + assert(value); > +} > + > +active proctype notifier() > +{ > + value = true; > + SET; > +} > diff --git a/include/qemu/thread-win32.h b/include/qemu/thread-win32.h > index 3d58081..385ff5f 100644 > --- a/include/qemu/thread-win32.h > +++ b/include/qemu/thread-win32.h > @@ -18,6 +18,7 @@ struct QemuSemaphore { > }; > > struct QemuEvent { > + int value; > HANDLE event; > }; > > diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c > index 406b52f..6cdd553 100644 > --- a/util/qemu-thread-win32.c > +++ b/util/qemu-thread-win32.c > @@ -238,10 +238,34 @@ void qemu_sem_wait(QemuSemaphore *sem) > } > } > > +/* Wrap a Win32 manual-reset event with a fast userspace path. The idea > + * is to reset the Win32 event lazily, as part of a test-reset-test-wait > + * sequence. Such a sequence is, indeed, how QemuEvents are used by > + * RCU and other subsystems! > + * > + * Valid transitions: > + * - free->set, when setting the event > + * - busy->set, when setting the event, followed by futex_wake > + * - set->free, when resetting the event > + * - free->busy, when waiting > + * > + * set->busy does not happen (it can be observed from the outside but > + * it really is set->free->busy). > + * > + * busy->free provably cannot happen; to enforce it, the set->free transition > + * is done with an OR, which becomes a no-op if the event has concurrently > + * transitioned to free or busy (and is faster than cmpxchg). > + */ > + > +#define EV_SET 0 > +#define EV_FREE 1 > +#define EV_BUSY -1 > + > void qemu_event_init(QemuEvent *ev, bool init) > { > /* Manual reset. */ > - ev->event = CreateEvent(NULL, TRUE, init, NULL); > + ev->event = CreateEvent(NULL, TRUE, TRUE, NULL); > + ev->value = (init ? EV_SET : EV_FREE); > } > > void qemu_event_destroy(QemuEvent *ev) > @@ -251,17 +275,51 @@ void qemu_event_destroy(QemuEvent *ev) > > void qemu_event_set(QemuEvent *ev) > { > - SetEvent(ev->event); > + if (atomic_mb_read(&ev->value) != EV_SET) { > + if (atomic_xchg(&ev->value, EV_SET) == EV_BUSY) { > + /* There were waiters, wake them up. */ > + SetEvent(ev->event); > + } > + } > } > > void qemu_event_reset(QemuEvent *ev) > { > - ResetEvent(ev->event); > + if (atomic_mb_read(&ev->value) == EV_SET) { > + /* If there was a concurrent reset (or even reset+wait), > + * do nothing. Otherwise change EV_SET->EV_FREE. > + */ > + atomic_or(&ev->value, EV_FREE); > + } > } > > void qemu_event_wait(QemuEvent *ev) > { > - WaitForSingleObject(ev->event, INFINITE); > + unsigned value; > + > + value = atomic_mb_read(&ev->value); > + if (value != EV_SET) { > + if (value == EV_FREE) { > + /* qemu_event_set is not yet going to call SetEvent, but we are > + * going to do another check for EV_SET below when setting > EV_BUSY. > + * At that point it is safe to call WaitForSingleObject. > + */ > + ResetEvent(ev->event); > + > + /* Tell qemu_event_set that there are waiters. No need to retry > + * because there cannot be a concurent busy->free transition. > + * After the CAS, the event will be either set or busy. > + */ > + if (atomic_cmpxchg(&ev->value, EV_FREE, EV_BUSY) == EV_SET) { > + value = EV_SET; > + } else { > + value = EV_BUSY; > + } > + } > + if (value == EV_BUSY) { > + WaitForSingleObject(ev->event, INFINITE); > + } > + } > } > > struct QemuThreadData { >
Ping?